liblzma: Add lzma_stream_encoder_mt() for threaded compression.

This is the simplest method to do threading, which splits
the uncompressed data into blocks and compresses them
independently from each other. There's room for improvement
especially to reduce the memory usage, but nevertheless,
this is a good start.
This commit is contained in:
Lasse Collin 2011-04-11 22:03:30 +03:00
parent 25fe729532
commit de678e0c92
8 changed files with 1539 additions and 1 deletions

View File

@ -437,6 +437,7 @@ if test "x$enable_threads" = xyes; then
CC="$PTHREAD_CC" CC="$PTHREAD_CC"
AC_SEARCH_LIBS([clock_gettime], [rt]) AC_SEARCH_LIBS([clock_gettime], [rt])
fi fi
AM_CONDITIONAL([COND_THREADS], [test "x$ax_pthread_ok" = xyes])
echo echo
echo "Initializing Libtool:" echo "Initializing Libtool:"

View File

@ -60,6 +60,127 @@
#define LZMA_PRESET_EXTREME (UINT32_C(1) << 31) #define LZMA_PRESET_EXTREME (UINT32_C(1) << 31)
/**
* \brief Multithreading options
*/
typedef struct {
/**
* \brief Flags
*
* Set this to zero if no flags are wanted.
*
* No flags are currently supported.
*/
uint32_t flags;
/**
* \brief Number of worker threads to use
*/
uint32_t threads;
/**
* \brief Maximum uncompressed size of a Block
*
* The encoder will start a new .xz Block every block_size bytes.
* Using LZMA_FULL_FLUSH or LZMA_FULL_BARRIER with lzma_code()
* the caller may tell liblzma to start a new Block earlier.
*
* With LZMA2, a recommended block size is 2-4 times the LZMA2
* dictionary size. With very small dictionaries, it is recommended
* to use at least 1 MiB block size for good compression ratio, even
* if this is more than four times the dictionary size. Note that
* these are only recommendations for typical use cases; feel free
* to use other values. Just keep in mind that using a block size
* less than the LZMA2 dictionary size is waste of RAM.
*
* Set this to 0 to let liblzma choose the block size depending
* on the compression options. For LZMA2 it will be 3*dict_size
* or 1 MiB, whichever is more.
*/
uint64_t block_size;
/**
* \brief Timeout to allow lzma_code() to return early
*
* Multithreading can make liblzma to consume input and produce
* output in a very bursty way: it may first read a lot of input
* to fill internal buffers, then no input or output occurs for
* a while.
*
* In single-threaded mode, lzma_code() won't return until it has
* either consumed all the input or filled the output buffer. If
* this is done in multithreaded mode, it may cause a call
* lzma_code() to take even tens of seconds, which isn't acceptable
* in all applications.
*
* To avoid very long blocking times in lzma_code(), a timeout
* (in milliseconds) may be set here. If lzma_code() would block
* longer than this number of milliseconds, it will return with
* LZMA_OK. Reasonable values are 100 ms or more. The xz command
* line tool uses 300 ms.
*
* If long blocking times are fine for you, set timeout to a special
* value of 0, which will disable the timeout mechanism and will make
* lzma_code() block until all the input is consumed or the output
* buffer has been filled.
*
* \note Even with a timeout, lzma_code() might sometimes take
* somewhat long time to return. No timing guarantees
* are made.
*/
uint32_t timeout;
/**
* \brief Compression preset (level and possible flags)
*
* The preset is set just like with lzma_easy_encoder().
* The preset is ignored if filters below is non-NULL.
*/
uint32_t preset;
/**
* \brief Filter chain (alternative to a preset)
*
* If this is NULL, the preset above is used. Otherwise the preset
* is ignored and the filter chain specified here is used.
*/
const lzma_filter *filters;
/**
* \brief Integrity check type
*
* See check.h for available checks. The xz command line tool
* defaults to LZMA_CHECK_CRC64, which is a good choice if you
* are unsure.
*/
lzma_check check;
/*
* Reserved space to allow possible future extensions without
* breaking the ABI. You should not touch these, because the names
* of these variables may change. These are and will never be used
* with the currently supported options, so it is safe to leave these
* uninitialized.
*/
lzma_reserved_enum reserved_enum1;
lzma_reserved_enum reserved_enum2;
lzma_reserved_enum reserved_enum3;
uint32_t reserved_int1;
uint32_t reserved_int2;
uint32_t reserved_int3;
uint32_t reserved_int4;
uint64_t reserved_int5;
uint64_t reserved_int6;
uint64_t reserved_int7;
uint64_t reserved_int8;
void *reserved_ptr1;
void *reserved_ptr2;
void *reserved_ptr3;
void *reserved_ptr4;
} lzma_mt;
/** /**
* \brief Calculate approximate memory usage of easy encoder * \brief Calculate approximate memory usage of easy encoder
* *
@ -190,6 +311,48 @@ extern LZMA_API(lzma_ret) lzma_stream_encoder(lzma_stream *strm,
lzma_nothrow lzma_attr_warn_unused_result; lzma_nothrow lzma_attr_warn_unused_result;
/**
* \brief Calculate approximate memory usage of multithreaded .xz encoder
*
* Since doing the encoding in threaded mode doesn't affect the memory
* requirements of single-threaded decompressor, you can use
* lzma_easy_decoder_memusage(options->preset) or
* lzma_raw_decoder_memusage(options->filters) to calculate
* the decompressor memory requirements.
*
* \param options Compression options
*
* \return Number of bytes of memory required for encoding with the
* given options. If an error occurs, for example due to
* unsupported preset or filter chain, UINT64_MAX is returned.
*/
extern LZMA_API(uint64_t) lzma_stream_encoder_mt_memusage(
const lzma_mt *options) lzma_nothrow lzma_attr_pure;
/**
* \brief Initialize multithreaded .xz Stream encoder
*
* This provides the functionality of lzma_easy_encoder() and
* lzma_stream_encoder() as a single function for multithreaded use.
*
* TODO: For lzma_code(), only LZMA_RUN and LZMA_FINISH are currently
* supported. Support for other actions has been planned.
*
* \param strm Pointer to properly prepared lzma_stream
* \param options Pointer to multithreaded compression options
*
* \return - LZMA_OK
* - LZMA_MEM_ERROR
* - LZMA_UNSUPPORTED_CHECK
* - LZMA_OPTIONS_ERROR
* - LZMA_PROG_ERROR
*/
extern LZMA_API(lzma_ret) lzma_stream_encoder_mt(
lzma_stream *strm, const lzma_mt *options)
lzma_nothrow lzma_attr_warn_unused_result;
/** /**
* \brief Initialize .lzma encoder (legacy file format) * \brief Initialize .lzma encoder (legacy file format)
* *

View File

@ -40,6 +40,13 @@ liblzma_la_SOURCES += \
common/stream_encoder.c \ common/stream_encoder.c \
common/stream_flags_encoder.c \ common/stream_flags_encoder.c \
common/vli_encoder.c common/vli_encoder.c
if COND_THREADS
liblzma_la_SOURCES += \
common/outqueue.c \
common/outqueue.h \
common/stream_encoder_mt.c
endif
endif endif
if COND_MAIN_DECODER if COND_MAIN_DECODER

View File

@ -263,7 +263,9 @@ lzma_code(lzma_stream *strm, lzma_action action)
strm->internal->avail_in = strm->avail_in; strm->internal->avail_in = strm->avail_in;
switch (ret) { // Cast is needed to silence a warning about LZMA_TIMED_OUT, which
// isn't part of lzma_ret enumeration.
switch ((unsigned int)(ret)) {
case LZMA_OK: case LZMA_OK:
// Don't return LZMA_BUF_ERROR when it happens the first time. // Don't return LZMA_BUF_ERROR when it happens the first time.
// This is to avoid returning LZMA_BUF_ERROR when avail_out // This is to avoid returning LZMA_BUF_ERROR when avail_out
@ -279,6 +281,11 @@ lzma_code(lzma_stream *strm, lzma_action action)
} }
break; break;
case LZMA_TIMED_OUT:
strm->internal->allow_buf_error = false;
ret = LZMA_OK;
break;
case LZMA_STREAM_END: case LZMA_STREAM_END:
if (strm->internal->sequence == ISEQ_SYNC_FLUSH if (strm->internal->sequence == ISEQ_SYNC_FLUSH
|| strm->internal->sequence == ISEQ_FULL_FLUSH) || strm->internal->sequence == ISEQ_FULL_FLUSH)

View File

@ -49,6 +49,13 @@
#define LZMA_BUFFER_SIZE 4096 #define LZMA_BUFFER_SIZE 4096
/// Maximum number of worker threads within one multithreaded component.
/// The limit exists solely to make it simpler to prevent integer overflows
/// when allocating structures etc. This should be big enough for now...
/// the code won't scale anywhere close to this number anyway.
#define LZMA_THREADS_MAX 16384
/// Starting value for memory usage estimates. Instead of calculating size /// Starting value for memory usage estimates. Instead of calculating size
/// of _every_ structure and taking into account malloc() overhead etc., we /// of _every_ structure and taking into account malloc() overhead etc., we
/// add a base size to all memory usage estimates. It's not very accurate /// add a base size to all memory usage estimates. It's not very accurate
@ -69,6 +76,13 @@
| LZMA_CONCATENATED ) | LZMA_CONCATENATED )
/// Special return value (lzma_ret) to indicate that a timeout was reached
/// and lzma_code() must not return LZMA_BUF_ERROR. This is converted to
/// LZMA_OK in lzma_code(). This is not in the lzma_ret enumeration because
/// there's no need to have it in the public API.
#define LZMA_TIMED_OUT 32
/// Type of encoder/decoder specific data; the actual structure is defined /// Type of encoder/decoder specific data; the actual structure is defined
/// differently in different coders. /// differently in different coders.
typedef struct lzma_coder_s lzma_coder; typedef struct lzma_coder_s lzma_coder;

View File

@ -0,0 +1,180 @@
///////////////////////////////////////////////////////////////////////////////
//
/// \file outqueue.c
/// \brief Output queue handling in multithreaded coding
//
// Author: Lasse Collin
//
// This file has been put into the public domain.
// You can do whatever you want with this file.
//
///////////////////////////////////////////////////////////////////////////////
#include "outqueue.h"
/// This is to ease integer overflow checking: We may allocate up to
/// 2 * LZMA_THREADS_MAX buffers and we need some extra memory for other
/// data structures (that's the second /2).
#define BUF_SIZE_MAX (UINT64_MAX / LZMA_THREADS_MAX / 2 / 2)
static lzma_ret
get_options(uint64_t *bufs_alloc_size, uint32_t *bufs_count,
uint64_t buf_size_max, uint32_t threads)
{
if (threads > LZMA_THREADS_MAX || buf_size_max > BUF_SIZE_MAX)
return LZMA_OPTIONS_ERROR;
// The number of buffers is twice the number of threads.
// This wastes RAM but keeps the threads busy when buffers
// finish out of order.
//
// NOTE: If this is changed, update BUF_SIZE_MAX too.
*bufs_count = threads * 2;
*bufs_alloc_size = *bufs_count * buf_size_max;
return LZMA_OK;
}
extern uint64_t
lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads)
{
uint64_t bufs_alloc_size;
uint32_t bufs_count;
if (get_options(&bufs_alloc_size, &bufs_count, buf_size_max, threads)
!= LZMA_OK)
return UINT64_MAX;
return sizeof(lzma_outq) + bufs_count * sizeof(lzma_outbuf)
+ bufs_alloc_size;
}
extern lzma_ret
lzma_outq_init(lzma_outq *outq, lzma_allocator *allocator,
uint64_t buf_size_max, uint32_t threads)
{
uint64_t bufs_alloc_size;
uint32_t bufs_count;
// Set bufs_count and bufs_alloc_size.
return_if_error(get_options(&bufs_alloc_size, &bufs_count,
buf_size_max, threads));
// Allocate memory if needed.
if (outq->buf_size_max != buf_size_max
|| outq->bufs_allocated != bufs_count) {
lzma_outq_end(outq, allocator);
#if SIZE_MAX < UINT64_MAX
if (bufs_alloc_size > SIZE_MAX)
return LZMA_MEM_ERROR;
#endif
outq->bufs = lzma_alloc(bufs_count * sizeof(lzma_outbuf),
allocator);
outq->bufs_mem = lzma_alloc((size_t)(bufs_alloc_size),
allocator);
if (outq->bufs == NULL || outq->bufs_mem == NULL) {
lzma_outq_end(outq, allocator);
return LZMA_MEM_ERROR;
}
}
// Initialize the rest of the main structure. Initialization of
// outq->bufs[] is done when they are actually needed.
outq->buf_size_max = (size_t)(buf_size_max);
outq->bufs_allocated = bufs_count;
outq->bufs_pos = 0;
outq->bufs_used = 0;
outq->read_pos = 0;
return LZMA_OK;
}
extern void
lzma_outq_end(lzma_outq *outq, lzma_allocator *allocator)
{
lzma_free(outq->bufs, allocator);
lzma_free(outq->bufs_mem, allocator);
return;
}
extern lzma_outbuf *
lzma_outq_get_buf(lzma_outq *outq)
{
// Caller must have checked it with lzma_outq_has_buf().
assert(outq->bufs_used < outq->bufs_allocated);
// Initialize the new buffer.
lzma_outbuf *buf = &outq->bufs[outq->bufs_pos];
buf->buf = outq->bufs_mem + outq->bufs_pos * outq->buf_size_max;
buf->size = 0;
buf->finished = false;
// Update the queue state.
if (++outq->bufs_pos == outq->bufs_allocated)
outq->bufs_pos = 0;
++outq->bufs_used;
return buf;
}
extern bool
lzma_outq_is_readable(const lzma_outq *outq)
{
uint32_t i = outq->bufs_pos - outq->bufs_used;
if (outq->bufs_pos < outq->bufs_used)
i += outq->bufs_allocated;
return outq->bufs[i].finished;
}
extern lzma_ret
lzma_outq_read(lzma_outq *restrict outq, uint8_t *restrict out,
size_t *restrict out_pos, size_t out_size,
lzma_vli *restrict unpadded_size,
lzma_vli *restrict uncompressed_size)
{
// There must be at least one buffer from which to read.
if (outq->bufs_used == 0)
return LZMA_OK;
// Get the buffer.
uint32_t i = outq->bufs_pos - outq->bufs_used;
if (outq->bufs_pos < outq->bufs_used)
i += outq->bufs_allocated;
lzma_outbuf *buf = &outq->bufs[i];
// If it isn't finished yet, we cannot read from it.
if (!buf->finished)
return LZMA_OK;
// Copy from the buffer to output.
lzma_bufcpy(buf->buf, &outq->read_pos, buf->size,
out, out_pos, out_size);
// Return if we didn't get all the data from the buffer.
if (outq->read_pos < buf->size)
return LZMA_OK;
// The buffer was finished. Tell the caller its size information.
*unpadded_size = buf->unpadded_size;
*uncompressed_size = buf->uncompressed_size;
// Free this buffer for further use.
--outq->bufs_used;
outq->read_pos = 0;
return LZMA_STREAM_END;
}

View File

@ -0,0 +1,155 @@
///////////////////////////////////////////////////////////////////////////////
//
/// \file outqueue.h
/// \brief Output queue handling in multithreaded coding
//
// Author: Lasse Collin
//
// This file has been put into the public domain.
// You can do whatever you want with this file.
//
///////////////////////////////////////////////////////////////////////////////
#include "common.h"
/// Output buffer for a single thread
typedef struct {
/// Pointer to the output buffer of lzma_outq.buf_size_max bytes
uint8_t *buf;
/// Amount of data written to buf
size_t size;
/// Additional size information
lzma_vli unpadded_size;
lzma_vli uncompressed_size;
/// True when no more data will be written into this buffer.
///
/// \note This is read by another thread and thus access
/// to this variable needs a mutex.
bool finished;
} lzma_outbuf;
typedef struct {
/// Array of buffers that are used cyclically.
lzma_outbuf *bufs;
/// Memory allocated for all the buffers
uint8_t *bufs_mem;
/// Amount of buffer space available in each buffer
size_t buf_size_max;
/// Number of buffers allocated
uint32_t bufs_allocated;
/// Position in the bufs array. The next buffer to be taken
/// into use is bufs[bufs_pos].
uint32_t bufs_pos;
/// Number of buffers in use
uint32_t bufs_used;
/// Position in the buffer in lzma_outq_read()
size_t read_pos;
} lzma_outq;
/**
* \brief Calculate the memory usage of an output queue
*
* \return Approximate memory usage in bytes or UINT64_MAX on error.
*/
extern uint64_t lzma_outq_memusage(uint64_t buf_size_max, uint32_t threads);
/// \brief Initialize an output queue
///
/// \param outq Pointer to an output queue. Before calling
/// this function the first time, *outq should
/// have been zeroed with memzero() so that this
/// function knows that there are no previous
/// allocations to free.
/// \param allocator Pointer to allocator or NULL
/// \param buf_size_max Maximum amount of data that a single buffer
/// in the queue may need to store.
/// \param threads Number of buffers that may be in use
/// concurrently. Note that more than this number
/// of buffers will actually get allocated to
/// improve performance when buffers finish
/// out of order.
///
/// \return - LZMA_OK
/// - LZMA_MEM_ERROR
///
extern lzma_ret lzma_outq_init(lzma_outq *outq, lzma_allocator *allocator,
uint64_t buf_size_max, uint32_t threads);
/// \brief Free the memory associated with the output queue
extern void lzma_outq_end(lzma_outq *outq, lzma_allocator *allocator);
/// \brief Get a new buffer
///
/// lzma_outq_has_buf() must be used to check that there is a buffer
/// available before calling lzma_outq_get_buf().
///
extern lzma_outbuf *lzma_outq_get_buf(lzma_outq *outq);
/// \brief Test if there is data ready to be read
///
/// Call to this function must be protected with the same mutex that
/// is used to protect lzma_outbuf.finished.
///
extern bool lzma_outq_is_readable(const lzma_outq *outq);
/// \brief Read finished data
///
/// \param outq Pointer to an output queue
/// \param out Beginning of the output buffer
/// \param out_pos The next byte will be written to
/// out[*out_pos].
/// \param out_size Size of the out buffer; the first byte into
/// which no data is written to is out[out_size].
/// \param unpadded_size Unpadded Size from the Block encoder
/// \param uncompressed_size Uncompressed Size from the Block encoder
///
/// \return - LZMA: All OK. Either no data was available or the buffer
/// being read didn't become empty yet.
/// - LZMA_STREAM_END: The buffer being read was finished.
/// *unpadded_size and *uncompressed_size were set.
///
/// \note This reads lzma_outbuf.finished variables and thus call
/// to this function needs to be protected with a mutex.
///
extern lzma_ret lzma_outq_read(lzma_outq *restrict outq,
uint8_t *restrict out, size_t *restrict out_pos,
size_t out_size, lzma_vli *restrict unpadded_size,
lzma_vli *restrict uncompressed_size);
/// \brief Test if there is at least one buffer free
///
/// This must be used before getting a new buffer with lzma_outq_get_buf().
///
static inline bool
lzma_outq_has_buf(const lzma_outq *outq)
{
return outq->bufs_used < outq->bufs_allocated;
}
/// \brief Test if the queue is completely empty
static inline bool
lzma_outq_is_empty(const lzma_outq *outq)
{
return outq->bufs_used == 0;
}

File diff suppressed because it is too large Load Diff