liblzma: Threaded decoder: Support zpipe.c-style decoding loop.

This makes it possible to call lzma_code() in a loop that only
reads new input when lzma_code() didn't fill the output buffer
completely. That isn't the calling style suggested by the
liblzma example program 02_decompress.c so perhaps the usefulness
of this feature is limited.

Also, it is possible to write such a loop so that it works
with the single-threaded decoder but not with the threaded
decoder even after this commit, or so that it works only if
lzma_mt.timeout = 0.

The zlib tutorial <https://zlib.net/zlib_how.html> is a well-known
example of a loop where more input is read only when output isn't
full. Porting this as is to liblzma would work with the
single-threaded decoder (if LZMA_CONCATENATED isn't used) but it
wouldn't work with threaded decoder even after this commit because
the loop assumes that no more output is possible when it cannot
read more input ("if (strm.avail_in == 0) break;"). This cannot
be fixed at liblzma side; the loop has to be modified at least
a little.

I'm adding this in any case because the actual code is simple
and short and should have no harmful side-effects in other
situations.
This commit is contained in:
Lasse Collin 2022-04-02 21:49:59 +03:00
parent 2ba8173e27
commit e671bc8828
1 changed files with 67 additions and 10 deletions

View File

@ -300,12 +300,25 @@ struct lzma_stream_coder {
/// Stream Padding is a multiple of four bytes. /// Stream Padding is a multiple of four bytes.
bool concatenated; bool concatenated;
/// When decoding concatenated Streams, this is true as long as we /// When decoding concatenated Streams, this is true as long as we
/// are decoding the first Stream. This is needed to avoid misleading /// are decoding the first Stream. This is needed to avoid misleading
/// LZMA_FORMAT_ERROR in case the later Streams don't have valid magic /// LZMA_FORMAT_ERROR in case the later Streams don't have valid magic
/// bytes. /// bytes.
bool first_stream; bool first_stream;
/// This is used to track if the previous call to stream_decode_mt()
/// had output space (*out_pos < out_size) and managed to fill the
/// output buffer (*out_pos == out_size). This may be set to true
/// in read_output_and_wait(). This is read and then reset to false
/// at the beginning of stream_decode_mt().
///
/// This is needed to support applications that call lzma_code() in
/// such a way that more input is provided only when lzma_code()
/// didn't fill the output buffer completely. Basically, this makes
/// it easier to convert such applications from single-threaded
/// decoder to multi-threaded decoder.
bool out_was_filled;
/// Write position in buffer[] and position in Stream Padding /// Write position in buffer[] and position in Stream Padding
size_t pos; size_t pos;
@ -656,6 +669,7 @@ read_output_and_wait(struct lzma_stream_coder *coder,
do { do {
// Get as much output from the queue as is possible // Get as much output from the queue as is possible
// without blocking. // without blocking.
const size_t out_start = *out_pos;
do { do {
ret = lzma_outq_read(&coder->outq, allocator, ret = lzma_outq_read(&coder->outq, allocator,
out, out_pos, out_size, out, out_pos, out_size,
@ -683,6 +697,14 @@ read_output_and_wait(struct lzma_stream_coder *coder,
if (ret != LZMA_OK) if (ret != LZMA_OK)
break; break;
// If the output buffer is now full but it wasn't full
// when this function was called, set out_was_filled.
// This way the next call to stream_decode_mt() knows
// that some output was produced and no output space
// remained in the previous call to stream_decode_mt().
if (*out_pos == out_size && *out_pos != out_start)
coder->out_was_filled = true;
// Check if any thread has indicated an error. // Check if any thread has indicated an error.
if (coder->thread_error != LZMA_OK) { if (coder->thread_error != LZMA_OK) {
if (coder->pending_error == LZMA_OK) if (coder->pending_error == LZMA_OK)
@ -949,11 +971,39 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
{ {
struct lzma_stream_coder *coder = coder_ptr; struct lzma_stream_coder *coder = coder_ptr;
const size_t in_start = *in_pos;
mythread_condtime wait_abs; mythread_condtime wait_abs;
bool has_blocked = false; bool has_blocked = false;
// Determine if in SEQ_BLOCK_HEADER and SEQ_BLOCK_THR_RUN we should
// tell read_output_and_wait() to wait until it can fill the output
// buffer (or a timeout occurs). Two conditions must be met:
//
// (1) If the caller provided no new input. The reason for this
// can be, for example, the end of the file or that there is
// a pause in the input stream and more input is available
// a little later. In this situation we should wait for output
// because otherwise we would end up in a busy-waiting loop where
// we make no progress and the application just calls us again
// without providing any new input. This would then result in
// LZMA_BUF_ERROR even though more output would be available
// once the worker threads decode more data.
//
// (2) Even if (1) is true, we will not wait if the previous call to
// this function managed to produce some output and the output
// buffer became full. This is for compatibility with applications
// that call lzma_code() in such a way that new input is provided
// only when the output buffer didn't become full. Without this
// trick such applications would have bad performance (bad
// parallelization due to decoder not getting input fast enough).
//
// NOTE: Such loops might require that timeout is disabled (0)
// if they assume that output-not-full implies that all input has
// been consumed. If and only if timeout is enabled, we may return
// when output isn't full *and* not all input has been consumed.
const bool waiting_allowed = *in_pos == in_size
&& !coder->out_was_filled;
coder->out_was_filled = false;
while (true) while (true)
switch (coder->sequence) { switch (coder->sequence) {
case SEQ_STREAM_HEADER: { case SEQ_STREAM_HEADER: {
@ -1030,11 +1080,11 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// without a delay. // without a delay.
// //
// On the other hand, if lzma_code() was called with // On the other hand, if lzma_code() was called with
// an empty input buffer (in_start == in_size), treat // an empty input buffer(*), treat it specially: try
// it specially: try to fill the output buffer even // to fill the output buffer even if it requires
// if it requires waiting for the worker threads to // waiting for the worker threads to provide output
// provide output (timeout, if specified, can still // (timeout, if specified, can still cause us to
// cause us to return). // return).
// //
// - This way the application will be able to get all // - This way the application will be able to get all
// data that can be decoded from the input provided // data that can be decoded from the input provided
@ -1049,11 +1099,15 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// anything and will return LZMA_OK immediately // anything and will return LZMA_OK immediately
// (coder->timeout is completely ignored). // (coder->timeout is completely ignored).
// //
// (*) See the comment at the beginning of this
// function how waiting_allowed is determined
// and why there is an exception to the rule
// of "called with an empty input buffer".
assert(*in_pos == in_size); assert(*in_pos == in_size);
return_if_error(read_output_and_wait(coder, allocator, return_if_error(read_output_and_wait(coder, allocator,
out, out_pos, out_size, out, out_pos, out_size,
NULL, in_start == in_size, NULL, waiting_allowed,
&wait_abs, &has_blocked)); &wait_abs, &has_blocked));
if (coder->pending_error != LZMA_OK) { if (coder->pending_error != LZMA_OK) {
@ -1403,10 +1457,11 @@ stream_decode_mt(void *coder_ptr, const lzma_allocator *allocator,
// Read output from the output queue. Just like in // Read output from the output queue. Just like in
// SEQ_BLOCK_HEADER, we wait to fill the output buffer // SEQ_BLOCK_HEADER, we wait to fill the output buffer
// only if lzma_code() was called without providing any input. // only if waiting_allowed was set to true in the beginning
// of this function (see the comment there).
return_if_error(read_output_and_wait(coder, allocator, return_if_error(read_output_and_wait(coder, allocator,
out, out_pos, out_size, out, out_pos, out_size,
NULL, in_start == in_size, NULL, waiting_allowed,
&wait_abs, &has_blocked)); &wait_abs, &has_blocked));
if (coder->pending_error != LZMA_OK) { if (coder->pending_error != LZMA_OK) {
@ -1823,7 +1878,9 @@ stream_decoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator,
coder->tell_any_check = (options->flags & LZMA_TELL_ANY_CHECK) != 0; coder->tell_any_check = (options->flags & LZMA_TELL_ANY_CHECK) != 0;
coder->ignore_check = (options->flags & LZMA_IGNORE_CHECK) != 0; coder->ignore_check = (options->flags & LZMA_IGNORE_CHECK) != 0;
coder->concatenated = (options->flags & LZMA_CONCATENATED) != 0; coder->concatenated = (options->flags & LZMA_CONCATENATED) != 0;
coder->first_stream = true; coder->first_stream = true;
coder->out_was_filled = false;
coder->pos = 0; coder->pos = 0;
coder->threads_max = options->threads; coder->threads_max = options->threads;