liblzma: Fix a deadlock in threaded decoder.
If a worker thread has consumed all input so far and it's waiting on thr->cond and then the main thread enables partial update for that thread, the code used to deadlock. This commit allows one dummy decoding pass to occur in this situation which then also does the partial update. As part of the fix, this moves thr->progress_* updates to avoid the second thr->mutex locking. Thanks to Jia Tan for finding, debugging, and reporting the bug.
This commit is contained in:
parent
e0394e9423
commit
bd93b776c1
|
@ -39,6 +39,26 @@ typedef enum {
|
||||||
} worker_state;
|
} worker_state;
|
||||||
|
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
/// Partial updates (storing of worker thread progress
|
||||||
|
/// to lzma_outbuf) are disabled.
|
||||||
|
PARTIAL_DISABLED,
|
||||||
|
|
||||||
|
/// Main thread requests partial updates to be enabled but
|
||||||
|
/// no partial update has been done by the worker thread yet.
|
||||||
|
///
|
||||||
|
/// Changing from PARTIAL_DISABLED to PARTIAL_START requires
|
||||||
|
/// use of the worker-thread mutex. Other transitions don't
|
||||||
|
/// need a mutex.
|
||||||
|
PARTIAL_START,
|
||||||
|
|
||||||
|
/// Partial updates are enabled and the worker thread has done
|
||||||
|
/// at least one partial update.
|
||||||
|
PARTIAL_ENABLED,
|
||||||
|
|
||||||
|
} partial_update_mode;
|
||||||
|
|
||||||
|
|
||||||
struct worker_thread {
|
struct worker_thread {
|
||||||
/// Worker state is protected with our mutex.
|
/// Worker state is protected with our mutex.
|
||||||
worker_state state;
|
worker_state state;
|
||||||
|
@ -90,9 +110,10 @@ struct worker_thread {
|
||||||
/// happen if all worker threads were frequently locking the main
|
/// happen if all worker threads were frequently locking the main
|
||||||
/// mutex to update their outbuf->pos.
|
/// mutex to update their outbuf->pos.
|
||||||
///
|
///
|
||||||
/// Only when partial_update is true, this worker thread will update
|
/// Only when partial_update is something else than PARTIAL_DISABLED,
|
||||||
/// outbuf->pos after each call to the Block decoder.
|
/// this worker thread will update outbuf->pos after each call to
|
||||||
bool partial_update;
|
/// the Block decoder.
|
||||||
|
partial_update_mode partial_update;
|
||||||
|
|
||||||
/// Block decoder
|
/// Block decoder
|
||||||
lzma_next_coder block_decoder;
|
lzma_next_coder block_decoder;
|
||||||
|
@ -303,7 +324,8 @@ worker_enable_partial_update(void *thr_ptr)
|
||||||
struct worker_thread *thr = thr_ptr;
|
struct worker_thread *thr = thr_ptr;
|
||||||
|
|
||||||
mythread_sync(thr->mutex) {
|
mythread_sync(thr->mutex) {
|
||||||
thr->partial_update = true;
|
thr->partial_update = PARTIAL_START;
|
||||||
|
mythread_cond_signal(&thr->cond);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -334,6 +356,7 @@ worker_decoder(void *thr_ptr)
|
||||||
{
|
{
|
||||||
struct worker_thread *thr = thr_ptr;
|
struct worker_thread *thr = thr_ptr;
|
||||||
size_t in_filled;
|
size_t in_filled;
|
||||||
|
partial_update_mode partial_update;
|
||||||
lzma_ret ret;
|
lzma_ret ret;
|
||||||
|
|
||||||
next_loop_lock:
|
next_loop_lock:
|
||||||
|
@ -371,9 +394,19 @@ next_loop_unlocked:
|
||||||
|
|
||||||
assert(thr->state == THR_RUN);
|
assert(thr->state == THR_RUN);
|
||||||
|
|
||||||
in_filled = thr->in_filled;
|
// Update progress info for get_progress().
|
||||||
|
thr->progress_in = thr->in_pos;
|
||||||
|
thr->progress_out = thr->out_pos;
|
||||||
|
|
||||||
if (in_filled == thr->in_pos) {
|
// If we don't have any new input, wait for a signal from the main
|
||||||
|
// thread except if partial output has just been enabled. In that
|
||||||
|
// case we will do one normal run so that the partial output info
|
||||||
|
// gets passed to the main thread. The call to block_decoder.code()
|
||||||
|
// is useless but harmless as it can occur only once per Block.
|
||||||
|
in_filled = thr->in_filled;
|
||||||
|
partial_update = thr->partial_update;
|
||||||
|
|
||||||
|
if (in_filled == thr->in_pos && partial_update != PARTIAL_START) {
|
||||||
mythread_cond_wait(&thr->cond, &thr->mutex);
|
mythread_cond_wait(&thr->cond, &thr->mutex);
|
||||||
goto next_loop_unlocked;
|
goto next_loop_unlocked;
|
||||||
}
|
}
|
||||||
|
@ -382,7 +415,7 @@ next_loop_unlocked:
|
||||||
|
|
||||||
// Pass the input in small chunks to the Block decoder.
|
// Pass the input in small chunks to the Block decoder.
|
||||||
// This way we react reasonably fast if we are told to stop/exit,
|
// This way we react reasonably fast if we are told to stop/exit,
|
||||||
// and (when partial_update is true) we tell about our progress
|
// and (when partial update is enabled) we tell about our progress
|
||||||
// to the main thread frequently enough.
|
// to the main thread frequently enough.
|
||||||
const size_t chunk_size = 16384;
|
const size_t chunk_size = 16384;
|
||||||
if ((in_filled - thr->in_pos) > chunk_size)
|
if ((in_filled - thr->in_pos) > chunk_size)
|
||||||
|
@ -395,24 +428,23 @@ next_loop_unlocked:
|
||||||
thr->outbuf->allocated, LZMA_RUN);
|
thr->outbuf->allocated, LZMA_RUN);
|
||||||
|
|
||||||
if (ret == LZMA_OK) {
|
if (ret == LZMA_OK) {
|
||||||
bool partial_update;
|
if (partial_update != PARTIAL_DISABLED) {
|
||||||
|
// The main thread uses thr->mutex to change from
|
||||||
|
// PARTIAL_DISABLED to PARTIAL_START. The main thread
|
||||||
|
// doesn't care about this variable after that so we
|
||||||
|
// can safely change it here to PARTIAL_ENABLED
|
||||||
|
// without a mutex.
|
||||||
|
thr->partial_update = PARTIAL_ENABLED;
|
||||||
|
|
||||||
mythread_sync(thr->mutex) {
|
|
||||||
// Update progress info for get_progress().
|
|
||||||
thr->progress_in = thr->in_pos;
|
|
||||||
thr->progress_out = thr->out_pos;
|
|
||||||
|
|
||||||
partial_update = thr->partial_update;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (partial_update) {
|
|
||||||
// The main thread is reading decompressed data
|
// The main thread is reading decompressed data
|
||||||
// from thr->outbuf. Tell the main thread about
|
// from thr->outbuf. Tell the main thread about
|
||||||
// our progress.
|
// our progress.
|
||||||
//
|
//
|
||||||
// NOTE: It's possible that we consumed input without
|
// NOTE: It's possible that we consumed input without
|
||||||
// producing any new output so it's possible that
|
// producing any new output so it's possible that
|
||||||
// only in_pos has changed.
|
// only in_pos has changed. In case of PARTIAL_START
|
||||||
|
// it is possible that neither in_pos nor out_pos has
|
||||||
|
// changed.
|
||||||
mythread_sync(thr->coder->mutex) {
|
mythread_sync(thr->coder->mutex) {
|
||||||
thr->outbuf->pos = thr->out_pos;
|
thr->outbuf->pos = thr->out_pos;
|
||||||
thr->outbuf->decoder_in_pos = thr->in_pos;
|
thr->outbuf->decoder_in_pos = thr->in_pos;
|
||||||
|
@ -603,7 +635,7 @@ get_thread(struct lzma_stream_coder *coder, const lzma_allocator *allocator)
|
||||||
coder->thr->progress_in = 0;
|
coder->thr->progress_in = 0;
|
||||||
coder->thr->progress_out = 0;
|
coder->thr->progress_out = 0;
|
||||||
|
|
||||||
coder->thr->partial_update = false;
|
coder->thr->partial_update = PARTIAL_DISABLED;
|
||||||
|
|
||||||
return LZMA_OK;
|
return LZMA_OK;
|
||||||
}
|
}
|
||||||
|
@ -757,7 +789,8 @@ read_output_and_wait(struct lzma_stream_coder *coder,
|
||||||
// without thr->mutex as only the main thread
|
// without thr->mutex as only the main thread
|
||||||
// modifies these variables. decoder_in_pos requires
|
// modifies these variables. decoder_in_pos requires
|
||||||
// coder->mutex which we are already holding.
|
// coder->mutex which we are already holding.
|
||||||
if (coder->thr != NULL && coder->thr->partial_update) {
|
if (coder->thr != NULL && coder->thr->partial_update
|
||||||
|
!= PARTIAL_DISABLED) {
|
||||||
// There is exactly one outbuf in the queue.
|
// There is exactly one outbuf in the queue.
|
||||||
assert(coder->thr->outbuf == coder->outq.head);
|
assert(coder->thr->outbuf == coder->outq.head);
|
||||||
assert(coder->thr->outbuf == coder->outq.tail);
|
assert(coder->thr->outbuf == coder->outq.tail);
|
||||||
|
|
Loading…
Reference in New Issue