diff --git a/src/liblzma/common/outqueue.c b/src/liblzma/common/outqueue.c index 2dc8a38d..6331a50c 100644 --- a/src/liblzma/common/outqueue.c +++ b/src/liblzma/common/outqueue.c @@ -13,84 +13,100 @@ #include "outqueue.h" -/// This is to ease integer overflow checking: We may allocate up to -/// 2 * LZMA_THREADS_MAX buffers and we need some extra memory for other -/// data structures (that's the second /2). -#define BUF_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX / 2 / 2) - - -static lzma_ret -get_options(uint64_t *bufs_alloc_size, uint32_t *bufs_count, - uint64_t buf_size_max, uint32_t threads) -{ - if (threads > LZMA_THREADS_MAX || buf_size_max > BUF_SIZE_MAX) - return LZMA_OPTIONS_ERROR; - - // The number of buffers is twice the number of threads. - // This wastes RAM but keeps the threads busy when buffers - // finish out of order. - // - // NOTE: If this is changed, update BUF_SIZE_MAX too. - *bufs_count = threads * 2; - *bufs_alloc_size = *bufs_count * buf_size_max; - - return LZMA_OK; -} +/// Get the maximum number of buffers that may be allocated based +/// on the number of threads. For now this is twice the number of threads. +/// It's a compromise between RAM usage and keeping the worker threads busy +/// when buffers finish out of order. +#define GET_BUFS_LIMIT(threads) (2 * (threads)) extern uint64_t lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads) { - uint64_t bufs_alloc_size; - uint32_t bufs_count; + // This is to ease integer overflow checking: We may allocate up to + // GET_BUFS_LIMIT(LZMA_THREADS_MAX) buffers and we need some extra + // memory for other data structures too (that's the /2). + // + // lzma_outq_prealloc_buf() will still accept bigger buffers than this. + const uint64_t limit + = UINT64_MAX / GET_BUFS_LIMIT(LZMA_THREADS_MAX) / 2; - if (get_options(&bufs_alloc_size, &bufs_count, buf_size_max, threads) - != LZMA_OK) + if (threads > LZMA_THREADS_MAX || buf_size_max > limit) return UINT64_MAX; - return sizeof(lzma_outq) + bufs_count * sizeof(lzma_outbuf) - + bufs_alloc_size; + return GET_BUFS_LIMIT(threads) * (sizeof(lzma_outbuf) + buf_size_max); +} + + +static void +move_head_to_cache(lzma_outq *outq, const lzma_allocator *allocator) +{ + assert(outq->head != NULL); + assert(outq->tail != NULL); + assert(outq->bufs_in_use > 0); + + --outq->bufs_in_use; + + lzma_outbuf *buf = outq->head; + outq->head = buf->next; + if (outq->head == NULL) + outq->tail = NULL; + + if (outq->cache != NULL && outq->cache->allocated != buf->allocated) + lzma_outq_clear_cache(outq, allocator); + + buf->next = outq->cache; + outq->cache = buf; + + return; +} + + +static void +free_one_cached_buffer(lzma_outq *outq, const lzma_allocator *allocator) +{ + assert(outq->cache != NULL); + + lzma_outbuf *buf = outq->cache; + outq->cache = buf->next; + + --outq->bufs_allocated; + outq->memusage -= sizeof(*buf) + buf->allocated; + + lzma_free(buf, allocator); + return; +} + + +extern void +lzma_outq_clear_cache(lzma_outq *outq, const lzma_allocator *allocator) +{ + while (outq->cache != NULL) + free_one_cached_buffer(outq, allocator); + + return; } extern lzma_ret lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, - uint64_t buf_size_max, uint32_t threads) + uint32_t threads) { - uint64_t bufs_alloc_size; - uint32_t bufs_count; + if (threads > LZMA_THREADS_MAX) + return LZMA_OPTIONS_ERROR; - // Set bufs_count and bufs_alloc_size. - return_if_error(get_options(&bufs_alloc_size, &bufs_count, - buf_size_max, threads)); + const uint32_t bufs_limit = GET_BUFS_LIMIT(threads); - // Allocate memory if needed. - if (outq->buf_size_max != buf_size_max - || outq->bufs_allocated != bufs_count) { - lzma_outq_end(outq, allocator); + // Clear head/tail. + while (outq->head != NULL) + move_head_to_cache(outq, allocator); -#if SIZE_MAX < UINT64_MAX - if (bufs_alloc_size > SIZE_MAX) - return LZMA_MEM_ERROR; -#endif + // If new buf_limit is lower than the old one, we may need to free + // a few cached buffers. + while (bufs_limit < outq->bufs_allocated) + free_one_cached_buffer(outq, allocator); - outq->bufs = lzma_alloc(bufs_count * sizeof(lzma_outbuf), - allocator); - outq->bufs_mem = lzma_alloc((size_t)(bufs_alloc_size), - allocator); - - if (outq->bufs == NULL || outq->bufs_mem == NULL) { - lzma_outq_end(outq, allocator); - return LZMA_MEM_ERROR; - } - } - - // Initialize the rest of the main structure. Initialization of - // outq->bufs[] is done when they are actually needed. - outq->buf_size_max = (size_t)(buf_size_max); - outq->bufs_allocated = bufs_count; - outq->bufs_pos = 0; - outq->bufs_used = 0; + outq->bufs_limit = bufs_limit; outq->read_pos = 0; return LZMA_OK; @@ -100,33 +116,76 @@ lzma_outq_init(lzma_outq *outq, const lzma_allocator *allocator, extern void lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator) { - lzma_free(outq->bufs, allocator); - outq->bufs = NULL; - - lzma_free(outq->bufs_mem, allocator); - outq->bufs_mem = NULL; + while (outq->head != NULL) + move_head_to_cache(outq, allocator); + lzma_outq_clear_cache(outq, allocator); return; } -extern lzma_outbuf * -lzma_outq_get_buf(lzma_outq *outq) +extern lzma_ret +lzma_outq_prealloc_buf(lzma_outq *outq, const lzma_allocator *allocator, + size_t size) { // Caller must have checked it with lzma_outq_has_buf(). - assert(outq->bufs_used < outq->bufs_allocated); + assert(outq->bufs_in_use < outq->bufs_limit); - // Initialize the new buffer. - lzma_outbuf *buf = &outq->bufs[outq->bufs_pos]; - buf->buf = outq->bufs_mem + outq->bufs_pos * outq->buf_size_max; - buf->size = 0; + // If there already is appropriately-sized buffer in the cache, + // we need to do nothing. + if (outq->cache != NULL && outq->cache->allocated == size) + return LZMA_OK; + + if (size > SIZE_MAX - sizeof(lzma_outbuf)) + return LZMA_MEM_ERROR; + + // The cache may have buffers but their size is wrong. + lzma_outq_clear_cache(outq, allocator); + + outq->cache = lzma_alloc(sizeof(lzma_outbuf) + size, allocator); + if (outq->cache == NULL) + return LZMA_MEM_ERROR; + + outq->cache->next = NULL; + outq->cache->allocated = size; + + ++outq->bufs_allocated; + outq->memusage += sizeof(lzma_outbuf) + size; + + return LZMA_OK; +} + + +extern lzma_outbuf * +lzma_outq_get_buf(lzma_outq *outq, void *worker) +{ + // Caller must have used lzma_outq_prealloc_buf() to ensure these. + assert(outq->bufs_in_use < outq->bufs_limit); + assert(outq->bufs_in_use < outq->bufs_allocated); + assert(outq->cache != NULL); + + lzma_outbuf *buf = outq->cache; + outq->cache = buf->next; + buf->next = NULL; + + if (outq->tail != NULL) { + assert(outq->head != NULL); + outq->tail->next = buf; + } else { + assert(outq->head == NULL); + outq->head = buf; + } + + outq->tail = buf; + + buf->worker = worker; buf->finished = false; + buf->pos = 0; - // Update the queue state. - if (++outq->bufs_pos == outq->bufs_allocated) - outq->bufs_pos = 0; + buf->unpadded_size = 0; + buf->uncompressed_size = 0; - ++outq->bufs_used; + ++outq->bufs_in_use; return buf; } @@ -135,50 +194,65 @@ lzma_outq_get_buf(lzma_outq *outq) extern bool lzma_outq_is_readable(const lzma_outq *outq) { - uint32_t i = outq->bufs_pos - outq->bufs_used; - if (outq->bufs_pos < outq->bufs_used) - i += outq->bufs_allocated; + if (outq->head == NULL) + return false; - return outq->bufs[i].finished; + return outq->read_pos < outq->head->pos || outq->head->finished; } extern lzma_ret -lzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out, - size_t *restrict out_pos, size_t out_size, +lzma_outq_read(lzma_outq *restrict outq, + const lzma_allocator *restrict allocator, + uint8_t *restrict out, size_t *restrict out_pos, + size_t out_size, lzma_vli *restrict unpadded_size, lzma_vli *restrict uncompressed_size) { // There must be at least one buffer from which to read. - if (outq->bufs_used == 0) + if (outq->bufs_in_use == 0) return LZMA_OK; // Get the buffer. - uint32_t i = outq->bufs_pos - outq->bufs_used; - if (outq->bufs_pos < outq->bufs_used) - i += outq->bufs_allocated; - - lzma_outbuf *buf = &outq->bufs[i]; - - // If it isn't finished yet, we cannot read from it. - if (!buf->finished) - return LZMA_OK; + lzma_outbuf *buf = outq->head; // Copy from the buffer to output. - lzma_bufcpy(buf->buf, &outq->read_pos, buf->size, + // + // FIXME? In threaded decoder it may be bad to do this copy while + // the mutex is being held. + lzma_bufcpy(buf->buf, &outq->read_pos, buf->pos, out, out_pos, out_size); // Return if we didn't get all the data from the buffer. - if (outq->read_pos < buf->size) + if (!buf->finished || outq->read_pos < buf->pos) return LZMA_OK; // The buffer was finished. Tell the caller its size information. - *unpadded_size = buf->unpadded_size; - *uncompressed_size = buf->uncompressed_size; + if (unpadded_size != NULL) + *unpadded_size = buf->unpadded_size; + + if (uncompressed_size != NULL) + *uncompressed_size = buf->uncompressed_size; // Free this buffer for further use. - --outq->bufs_used; + move_head_to_cache(outq, allocator); outq->read_pos = 0; return LZMA_STREAM_END; } + + +extern void +lzma_outq_enable_partial_output(lzma_outq *outq, + void (*enable_partial_output)(void *worker)) +{ + if (outq->head != NULL && !outq->head->finished + && outq->head->worker != NULL) { + enable_partial_output(outq->head->worker); + + // Set it to NULL since calling it twice is pointless. + outq->head->worker = NULL; + } + + return; +} diff --git a/src/liblzma/common/outqueue.h b/src/liblzma/common/outqueue.h index 079634de..355e0ced 100644 --- a/src/liblzma/common/outqueue.h +++ b/src/liblzma/common/outqueue.h @@ -14,16 +14,27 @@ /// Output buffer for a single thread -typedef struct { - /// Pointer to the output buffer of lzma_outq.buf_size_max bytes - uint8_t *buf; +typedef struct lzma_outbuf_s lzma_outbuf; +struct lzma_outbuf_s { + /// Pointer to the next buffer. This is used for the cached buffers. + /// The worker thread must not modify this. + lzma_outbuf *next; - /// Amount of data written to buf - size_t size; + /// This initialized by lzma_outq_get_buf() and + /// is used by lzma_outq_enable_partial_output(). + /// The worker thread must not modify this. + void *worker; - /// Additional size information - lzma_vli unpadded_size; - lzma_vli uncompressed_size; + /// Amount of memory allocated for buf[]. + /// The worker thread must not modify this. + size_t allocated; + + /// Writing position in the worker thread or, in other words, the + /// amount of finished data written to buf[] which can be copied out + /// + /// \note This is read by another thread and thus access + /// to this variable needs a mutex. + size_t pos; /// True when no more data will be written into this buffer. /// @@ -31,32 +42,44 @@ typedef struct { /// to this variable needs a mutex. bool finished; -} lzma_outbuf; + /// Additional size information. lzma_outq_read() may read these + /// when "finished" is true. + lzma_vli unpadded_size; + lzma_vli uncompressed_size; + + /// Buffer of "allocated" bytes + uint8_t buf[]; +}; typedef struct { - /// Array of buffers that are used cyclically. - lzma_outbuf *bufs; + /// Linked list of buffers in use. The next output byte will be + /// read from the head and buffers for the next thread will be + /// appended to the tail. tail->next is always NULL. + lzma_outbuf *head; + lzma_outbuf *tail; - /// Memory allocated for all the buffers - uint8_t *bufs_mem; - - /// Amount of buffer space available in each buffer - size_t buf_size_max; - - /// Number of buffers allocated - uint32_t bufs_allocated; - - /// Position in the bufs array. The next buffer to be taken - /// into use is bufs[bufs_pos]. - uint32_t bufs_pos; - - /// Number of buffers in use - uint32_t bufs_used; - - /// Position in the buffer in lzma_outq_read() + /// Number of bytes read from head->buf[] in lzma_outq_read() size_t read_pos; + /// Linked list of allocated buffers that aren't currently used. + /// This way buffers of similar size can be reused and don't + /// need to be reallocated every time. For simplicity, all + /// cached buffers in the list have the same allocated size. + lzma_outbuf *cache; + + /// Total amount of memory allocated for buffers + uint64_t memusage; + + /// Number of buffers in use in the head...tail list. If and only if + /// this is zero, the pointers head and tail above are NULL. + uint32_t bufs_in_use; + + /// Number of buffers allocated (in use + cached) + uint32_t bufs_allocated; + + /// Maximum allowed number of allocated buffers + uint32_t bufs_limit; } lzma_outq; @@ -76,32 +99,50 @@ extern uint64_t lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads); /// function knows that there are no previous /// allocations to free. /// \param allocator Pointer to allocator or NULL -/// \param buf_size_max Maximum amount of data that a single buffer -/// in the queue may need to store. /// \param threads Number of buffers that may be in use /// concurrently. Note that more than this number -/// of buffers will actually get allocated to +/// of buffers may actually get allocated to /// improve performance when buffers finish -/// out of order. +/// out of order. The actual maximum number of +/// allocated buffers is derived from the number +/// of threads. /// /// \return - LZMA_OK /// - LZMA_MEM_ERROR /// -extern lzma_ret lzma_outq_init( - lzma_outq *outq, const lzma_allocator *allocator, - uint64_t buf_size_max, uint32_t threads); +extern lzma_ret lzma_outq_init(lzma_outq *outq, + const lzma_allocator *allocator, uint32_t threads); /// \brief Free the memory associated with the output queue extern void lzma_outq_end(lzma_outq *outq, const lzma_allocator *allocator); +/// \brief Free all cached buffers that consume memory but aren't in use +extern void lzma_outq_clear_cache( + lzma_outq *outq, const lzma_allocator *allocator); + + +/// \brief Preallocate a new buffer into cache +/// +/// Splitting the buffer allocation into a separate function makes it +/// possible to ensure that way lzma_outq_get_buf() cannot fail. +/// If the preallocated buffer isn't actually used (for example, some +/// other error occurs), the caller has to do nothing as the buffer will +/// be used later or cleared from the cache when not needed. +/// +/// \return LZMA_OK on success, LZMA_MEM_ERROR if allocation fails +/// +extern lzma_ret lzma_outq_prealloc_buf( + lzma_outq *outq, const lzma_allocator *allocator, size_t size); + + /// \brief Get a new buffer /// -/// lzma_outq_has_buf() must be used to check that there is a buffer +/// lzma_outq_prealloc_buf() must be used to ensure that there is a buffer /// available before calling lzma_outq_get_buf(). /// -extern lzma_outbuf *lzma_outq_get_buf(lzma_outq *outq); +extern lzma_outbuf *lzma_outq_get_buf(lzma_outq *outq, void *worker); /// \brief Test if there is data ready to be read @@ -126,17 +167,32 @@ extern bool lzma_outq_is_readable(const lzma_outq *outq); /// \return - LZMA: All OK. Either no data was available or the buffer /// being read didn't become empty yet. /// - LZMA_STREAM_END: The buffer being read was finished. -/// *unpadded_size and *uncompressed_size were set. +/// *unpadded_size and *uncompressed_size were set if they +/// were not NULL. /// -/// \note This reads lzma_outbuf.finished variables and thus call -/// to this function needs to be protected with a mutex. +/// \note This reads lzma_outbuf.finished and .pos variables and thus +/// calls to this function need to be protected with a mutex. /// extern lzma_ret lzma_outq_read(lzma_outq *restrict outq, + const lzma_allocator *restrict allocator, uint8_t *restrict out, size_t *restrict out_pos, size_t out_size, lzma_vli *restrict unpadded_size, lzma_vli *restrict uncompressed_size); +/// \brief Enable partial output from a worker thread +/// +/// If the buffer at the head of the output queue isn't finished, +/// this will call enable_partial_output on the worker associated with +/// that output buffer. +/// +/// \note This reads a lzma_outbuf.finished variable and thus +/// calls to this function need to be protected with a mutex. +/// +extern void lzma_outq_enable_partial_output(lzma_outq *outq, + void (*enable_partial_output)(void *worker)); + + /// \brief Test if there is at least one buffer free /// /// This must be used before getting a new buffer with lzma_outq_get_buf(). @@ -144,7 +200,7 @@ extern lzma_ret lzma_outq_read(lzma_outq *restrict outq, static inline bool lzma_outq_has_buf(const lzma_outq *outq) { - return outq->bufs_used < outq->bufs_allocated; + return outq->bufs_in_use < outq->bufs_limit; } @@ -152,5 +208,5 @@ lzma_outq_has_buf(const lzma_outq *outq) static inline bool lzma_outq_is_empty(const lzma_outq *outq) { - return outq->bufs_used == 0; + return outq->bufs_in_use == 0; } diff --git a/src/liblzma/common/stream_encoder_mt.c b/src/liblzma/common/stream_encoder_mt.c index 01e40339..6b897ab9 100644 --- a/src/liblzma/common/stream_encoder_mt.c +++ b/src/liblzma/common/stream_encoder_mt.c @@ -133,6 +133,9 @@ struct lzma_stream_coder_s { /// Output buffer queue for compressed data lzma_outq outq; + /// How much memory to allocate for each lzma_outbuf.buf + size_t outbuf_alloc_size; + /// Maximum wait time if cannot use all the input and cannot /// fill the output buffer. This is in milliseconds. @@ -196,7 +199,7 @@ worker_error(worker_thread *thr, lzma_ret ret) static worker_state -worker_encode(worker_thread *thr, worker_state state) +worker_encode(worker_thread *thr, size_t *out_pos, worker_state state) { assert(thr->progress_in == 0); assert(thr->progress_out == 0); @@ -205,7 +208,7 @@ worker_encode(worker_thread *thr, worker_state state) thr->block_options = (lzma_block){ .version = 0, .check = thr->coder->stream_flags.check, - .compressed_size = thr->coder->outq.buf_size_max, + .compressed_size = thr->outbuf->allocated, .uncompressed_size = thr->coder->block_size, // TODO: To allow changing the filter chain, the filters @@ -234,12 +237,12 @@ worker_encode(worker_thread *thr, worker_state state) size_t in_pos = 0; size_t in_size = 0; - thr->outbuf->size = thr->block_options.header_size; - const size_t out_size = thr->coder->outq.buf_size_max; + *out_pos = thr->block_options.header_size; + const size_t out_size = thr->outbuf->allocated; do { mythread_sync(thr->mutex) { - // Store in_pos and out_pos into *thr so that + // Store in_pos and *out_pos into *thr so that // an application may read them via // lzma_get_progress() to get progress information. // @@ -247,7 +250,7 @@ worker_encode(worker_thread *thr, worker_state state) // finishes. Instead, the final values are taken // later from thr->outbuf. thr->progress_in = in_pos; - thr->progress_out = thr->outbuf->size; + thr->progress_out = *out_pos; while (in_size == thr->in_size && thr->state == THR_RUN) @@ -277,8 +280,8 @@ worker_encode(worker_thread *thr, worker_state state) ret = thr->block_encoder.code( thr->block_encoder.coder, thr->allocator, thr->in, &in_pos, in_limit, thr->outbuf->buf, - &thr->outbuf->size, out_size, action); - } while (ret == LZMA_OK && thr->outbuf->size < out_size); + out_pos, out_size, action); + } while (ret == LZMA_OK && *out_pos < out_size); switch (ret) { case LZMA_STREAM_END: @@ -313,10 +316,10 @@ worker_encode(worker_thread *thr, worker_state state) return state; // Do the encoding. This takes care of the Block Header too. - thr->outbuf->size = 0; + *out_pos = 0; ret = lzma_block_uncomp_encode(&thr->block_options, thr->in, in_size, thr->outbuf->buf, - &thr->outbuf->size, out_size); + out_pos, out_size); // It shouldn't fail. if (ret != LZMA_OK) { @@ -367,11 +370,13 @@ worker_start(void *thr_ptr) } } + size_t out_pos = 0; + assert(state != THR_IDLE); assert(state != THR_STOP); if (state <= THR_FINISH) - state = worker_encode(thr, state); + state = worker_encode(thr, &out_pos, state); if (state == THR_EXIT) break; @@ -387,14 +392,17 @@ worker_start(void *thr_ptr) } mythread_sync(thr->coder->mutex) { - // Mark the output buffer as finished if - // no errors occurred. - thr->outbuf->finished = state == THR_FINISH; + // If no errors occurred, make the encoded data + // available to be copied out. + if (state == THR_FINISH) { + thr->outbuf->pos = out_pos; + thr->outbuf->finished = true; + } // Update the main progress info. thr->coder->progress_in += thr->outbuf->uncompressed_size; - thr->coder->progress_out += thr->outbuf->size; + thr->coder->progress_out += out_pos; thr->progress_in = 0; thr->progress_out = 0; @@ -519,6 +527,11 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) if (!lzma_outq_has_buf(&coder->outq)) return LZMA_OK; + // That's also true if we cannot allocate memory for the output + // buffer in the output queue. + return_if_error(lzma_outq_prealloc_buf(&coder->outq, allocator, + coder->outbuf_alloc_size)); + // If there is a free structure on the stack, use it. mythread_sync(coder->mutex) { if (coder->threads_free != NULL) { @@ -541,7 +554,7 @@ get_thread(lzma_stream_coder *coder, const lzma_allocator *allocator) mythread_sync(coder->thr->mutex) { coder->thr->state = THR_RUN; coder->thr->in_size = 0; - coder->thr->outbuf = lzma_outq_get_buf(&coder->outq); + coder->thr->outbuf = lzma_outq_get_buf(&coder->outq, NULL); mythread_cond_signal(&coder->thr->cond); } @@ -704,7 +717,7 @@ stream_encode_mt(void *coder_ptr, const lzma_allocator *allocator, } // Try to read compressed data to out[]. - ret = lzma_outq_read(&coder->outq, + ret = lzma_outq_read(&coder->outq, allocator, out, out_pos, out_size, &unpadded_size, &uncompressed_size); @@ -951,7 +964,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, &block_size, &outbuf_size_max)); #if SIZE_MAX < UINT64_MAX - if (block_size > SIZE_MAX) + if (block_size > SIZE_MAX || outbuf_size_max > SIZE_MAX) return LZMA_MEM_ERROR; #endif @@ -1012,6 +1025,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, // Basic initializations coder->sequence = SEQ_STREAM_HEADER; coder->block_size = (size_t)(block_size); + coder->outbuf_alloc_size = (size_t)(outbuf_size_max); coder->thread_error = LZMA_OK; coder->thr = NULL; @@ -1041,7 +1055,7 @@ stream_encoder_mt_init(lzma_next_coder *next, const lzma_allocator *allocator, // Output queue return_if_error(lzma_outq_init(&coder->outq, allocator, - outbuf_size_max, options->threads)); + options->threads)); // Timeout coder->timeout = options->timeout;