459 lines
9.8 KiB
C
459 lines
9.8 KiB
C
///////////////////////////////////////////////////////////////////////////////
|
|
//
|
|
/// \file process.c
|
|
/// \brief Compresses or uncompresses a file
|
|
//
|
|
// Copyright (C) 2007 Lasse Collin
|
|
//
|
|
// This program is free software; you can redistribute it and/or
|
|
// modify it under the terms of the GNU Lesser General Public
|
|
// License as published by the Free Software Foundation; either
|
|
// version 2.1 of the License, or (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
// Lesser General Public License for more details.
|
|
//
|
|
///////////////////////////////////////////////////////////////////////////////
|
|
|
|
#include "private.h"
|
|
|
|
|
|
typedef struct {
|
|
lzma_stream strm;
|
|
void *options;
|
|
|
|
file_pair *pair;
|
|
|
|
/// We don't need this for *anything* but seems that at least with
|
|
/// glibc pthread_create() doesn't allow NULL.
|
|
pthread_t thread;
|
|
|
|
bool in_use;
|
|
|
|
} thread_data;
|
|
|
|
|
|
/// Number of available threads
|
|
static size_t free_threads;
|
|
|
|
/// Thread-specific data
|
|
static thread_data *threads;
|
|
|
|
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
|
|
|
|
/// Attributes of new coder threads. They are created in detached state.
|
|
/// Coder threads signal to the service thread themselves when they are done.
|
|
static pthread_attr_t thread_attr;
|
|
|
|
|
|
//////////
|
|
// Init //
|
|
//////////
|
|
|
|
extern void
|
|
process_init(void)
|
|
{
|
|
threads = malloc(sizeof(thread_data) * opt_threads);
|
|
if (threads == NULL) {
|
|
out_of_memory();
|
|
my_exit(ERROR);
|
|
}
|
|
|
|
for (size_t i = 0; i < opt_threads; ++i)
|
|
threads[i] = (thread_data){
|
|
.strm = LZMA_STREAM_INIT_VAR,
|
|
.options = NULL,
|
|
.pair = NULL,
|
|
.in_use = false,
|
|
};
|
|
|
|
if (pthread_attr_init(&thread_attr)
|
|
|| pthread_attr_setdetachstate(
|
|
&thread_attr, PTHREAD_CREATE_DETACHED)) {
|
|
out_of_memory();
|
|
my_exit(ERROR);
|
|
}
|
|
|
|
free_threads = opt_threads;
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
//////////////////////////
|
|
// Thread-specific data //
|
|
//////////////////////////
|
|
|
|
static thread_data *
|
|
get_thread_data(void)
|
|
{
|
|
pthread_mutex_lock(&mutex);
|
|
|
|
while (free_threads == 0) {
|
|
pthread_cond_wait(&cond, &mutex);
|
|
|
|
if (user_abort) {
|
|
pthread_cond_signal(&cond);
|
|
pthread_mutex_unlock(&mutex);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
thread_data *t = threads;
|
|
while (t->in_use)
|
|
++t;
|
|
|
|
t->in_use = true;
|
|
--free_threads;
|
|
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
return t;
|
|
}
|
|
|
|
|
|
static void
|
|
release_thread_data(thread_data *t)
|
|
{
|
|
pthread_mutex_lock(&mutex);
|
|
|
|
t->in_use = false;
|
|
++free_threads;
|
|
|
|
pthread_cond_signal(&cond);
|
|
pthread_mutex_unlock(&mutex);
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
static int
|
|
create_thread(void *(*func)(thread_data *t), thread_data *t)
|
|
{
|
|
if (opt_threads == 1) {
|
|
func(t);
|
|
} else {
|
|
const int err = pthread_create(&t->thread, &thread_attr,
|
|
(void *(*)(void *))(func), t);
|
|
if (err) {
|
|
errmsg(V_ERROR, _("Cannot create a thread: %s"),
|
|
strerror(err));
|
|
user_abort = 1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/////////////////////////
|
|
// One thread per file //
|
|
/////////////////////////
|
|
|
|
static int
|
|
single_init(thread_data *t)
|
|
{
|
|
lzma_ret ret;
|
|
|
|
if (opt_mode == MODE_COMPRESS) {
|
|
const lzma_vli uncompressed_size
|
|
= t->pair->src_fd != STDIN_FILENO
|
|
? (lzma_vli)(t->pair->src_st.st_size)
|
|
: LZMA_VLI_VALUE_UNKNOWN;
|
|
|
|
// TODO Support Multi-Block Streams to store Extra.
|
|
if (opt_header == HEADER_ALONE) {
|
|
lzma_options_alone alone;
|
|
alone.uncompressed_size = uncompressed_size;
|
|
memcpy(&alone.lzma, opt_filters[0].options,
|
|
sizeof(alone.lzma));
|
|
ret = lzma_alone_encoder(&t->strm, &alone);
|
|
} else {
|
|
lzma_options_stream stream = {
|
|
.check = opt_check,
|
|
.has_crc32 = true,
|
|
.uncompressed_size = uncompressed_size,
|
|
.alignment = 0,
|
|
};
|
|
memcpy(stream.filters, opt_filters,
|
|
sizeof(stream.filters));
|
|
ret = lzma_stream_encoder_single(&t->strm, &stream);
|
|
}
|
|
} else {
|
|
// TODO Restrict file format if requested on the command line.
|
|
ret = lzma_auto_decoder(&t->strm, NULL, NULL);
|
|
}
|
|
|
|
if (ret != LZMA_OK) {
|
|
if (ret == LZMA_MEM_ERROR)
|
|
out_of_memory();
|
|
else
|
|
internal_error();
|
|
|
|
return -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static lzma_ret
|
|
single_skip_padding(thread_data *t, uint8_t *in_buf)
|
|
{
|
|
// Handle decoding of concatenated Streams. There can be arbitrary
|
|
// number of nul-byte padding between the Streams, which must be
|
|
// ignored.
|
|
//
|
|
// NOTE: Concatenating LZMA_Alone files works only if at least
|
|
// one of lc, lp, and pb is non-zero. Using the concatenation
|
|
// on LZMA_Alone files is strongly discouraged.
|
|
while (true) {
|
|
while (t->strm.avail_in > 0) {
|
|
if (*t->strm.next_in != '\0')
|
|
return LZMA_OK;
|
|
|
|
++t->strm.next_in;
|
|
--t->strm.avail_in;
|
|
}
|
|
|
|
if (t->pair->src_eof)
|
|
return LZMA_STREAM_END;
|
|
|
|
t->strm.next_in = in_buf;
|
|
t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
|
|
if (t->strm.avail_in == SIZE_MAX)
|
|
return LZMA_DATA_ERROR;
|
|
}
|
|
}
|
|
|
|
|
|
static void *
|
|
single(thread_data *t)
|
|
{
|
|
if (single_init(t)) {
|
|
io_close(t->pair, false);
|
|
release_thread_data(t);
|
|
return NULL;
|
|
}
|
|
|
|
uint8_t in_buf[BUFSIZ];
|
|
uint8_t out_buf[BUFSIZ];
|
|
lzma_action action = LZMA_RUN;
|
|
lzma_ret ret;
|
|
bool success = false;
|
|
|
|
t->strm.avail_in = 0;
|
|
|
|
while (!user_abort) {
|
|
if (t->strm.avail_in == 0 && !t->pair->src_eof) {
|
|
t->strm.next_in = in_buf;
|
|
t->strm.avail_in = io_read(t->pair, in_buf, BUFSIZ);
|
|
|
|
if (t->strm.avail_in == SIZE_MAX)
|
|
break;
|
|
else if (t->pair->src_eof
|
|
&& opt_mode == MODE_COMPRESS)
|
|
action = LZMA_FINISH;
|
|
}
|
|
|
|
t->strm.next_out = out_buf;
|
|
t->strm.avail_out = BUFSIZ;
|
|
|
|
ret = lzma_code(&t->strm, action);
|
|
|
|
if (opt_mode != MODE_TEST)
|
|
if (io_write(t->pair, out_buf,
|
|
BUFSIZ - t->strm.avail_out))
|
|
break;
|
|
|
|
if (ret != LZMA_OK) {
|
|
if (ret == LZMA_STREAM_END) {
|
|
if (opt_mode == MODE_COMPRESS) {
|
|
success = true;
|
|
break;
|
|
}
|
|
|
|
// Support decoding concatenated .lzma files.
|
|
ret = single_skip_padding(t, in_buf);
|
|
|
|
if (ret == LZMA_STREAM_END) {
|
|
assert(t->pair->src_eof);
|
|
success = true;
|
|
break;
|
|
}
|
|
|
|
if (ret == LZMA_OK && !single_init(t))
|
|
continue;
|
|
|
|
break;
|
|
|
|
} else {
|
|
errmsg(V_ERROR, "%s: %s", t->pair->src_name,
|
|
str_strm_error(ret));
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
io_close(t->pair, success);
|
|
release_thread_data(t);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
|
|
///////////////////////////////
|
|
// Multiple threads per file //
|
|
///////////////////////////////
|
|
|
|
// TODO
|
|
|
|
// I'm not sure what would the best way to implement this. Here's one
|
|
// possible way:
|
|
// - Reader thread would read the input data and control the coders threads.
|
|
// - Every coder thread is associated with input and output buffer pools.
|
|
// The input buffer pool is filled by reader thread, and the output buffer
|
|
// pool is emptied by the writer thread.
|
|
// - Writer thread writes the output data of the oldest living coder thread.
|
|
//
|
|
// The per-file thread started by the application's main thread is used as
|
|
// the reader thread. In the beginning, it starts the writer thread and the
|
|
// first coder thread. The coder thread would be left waiting for input from
|
|
// the reader thread, and the writer thread would be waiting for input from
|
|
// the coder thread.
|
|
//
|
|
// The reader thread reads the input data into a ring buffer, whose size
|
|
// depends on the value returned by lzma_chunk_size(). If the ring buffer
|
|
// gets full, the buffer is marked "to be finished", which indicates to
|
|
// the coder thread that no more input is coming. Then a new coder thread
|
|
// would be started.
|
|
//
|
|
// TODO
|
|
|
|
/*
|
|
typedef struct {
|
|
/// Buffers
|
|
uint8_t (*buffers)[BUFSIZ];
|
|
|
|
/// Number of buffers
|
|
size_t buffer_count;
|
|
|
|
/// buffers[read_pos] is the buffer currently being read. Once finish
|
|
/// is true and read_pos == write_pos, end of input has been reached.
|
|
size_t read_pos;
|
|
|
|
/// buffers[write_pos] is the buffer into which data is currently
|
|
/// being written.
|
|
size_t write_pos;
|
|
|
|
/// This variable matters only when read_pos == write_pos && finish.
|
|
/// In that case, this variable will contain the size of the
|
|
/// buffers[read_pos].
|
|
size_t last_size;
|
|
|
|
/// True once no more data is being written to the buffer. When this
|
|
/// is set, the last_size variable must have been set too.
|
|
bool finish;
|
|
|
|
/// Mutex to protect access to the variables in this structure
|
|
pthread_mutex_t mutex;
|
|
|
|
/// Condition to indicate when another thread can continue
|
|
pthread_cond_t cond;
|
|
} mem_pool;
|
|
|
|
|
|
static foo
|
|
multi_reader(thread_data *t)
|
|
{
|
|
bool done = false;
|
|
|
|
do {
|
|
const size_t size = io_read(t->pair,
|
|
m->buffers + m->write_pos, BUFSIZ);
|
|
if (size == SIZE_MAX) {
|
|
// TODO
|
|
} else if (t->pair->src_eof) {
|
|
m->last_size = size;
|
|
}
|
|
|
|
pthread_mutex_lock(&m->mutex);
|
|
|
|
if (++m->write_pos == m->buffer_count)
|
|
m->write_pos = 0;
|
|
|
|
if (m->write_pos == m->read_pos || t->pair->src_eof)
|
|
m->finish = true;
|
|
|
|
pthread_cond_signal(&m->cond);
|
|
pthread_mutex_unlock(&m->mutex);
|
|
|
|
} while (!m->finish);
|
|
|
|
return done ? 0 : -1;
|
|
}
|
|
|
|
|
|
static foo
|
|
multi_code()
|
|
{
|
|
lzma_action = LZMA_RUN;
|
|
|
|
while (true) {
|
|
pthread_mutex_lock(&m->mutex);
|
|
|
|
while (m->read_pos == m->write_pos && !m->finish)
|
|
pthread_cond_wait(&m->cond, &m->mutex);
|
|
|
|
pthread_mutex_unlock(&m->mutex);
|
|
|
|
if (m->finish) {
|
|
t->strm.avail_in = m->last_size;
|
|
if (opt_mode == MODE_COMPRESS)
|
|
action = LZMA_FINISH;
|
|
} else {
|
|
t->strm.avail_in = BUFSIZ;
|
|
}
|
|
|
|
t->strm.next_in = m->buffers + m->read_pos;
|
|
|
|
const lzma_ret ret = lzma_code(&t->strm, action);
|
|
|
|
}
|
|
}
|
|
|
|
*/
|
|
|
|
|
|
///////////////////////
|
|
// Starting new file //
|
|
///////////////////////
|
|
|
|
extern void
|
|
process_file(const char *filename)
|
|
{
|
|
thread_data *t = get_thread_data();
|
|
if (t == NULL)
|
|
return; // User abort
|
|
|
|
// If this fails, it shows appropriate error messages too.
|
|
t->pair = io_open(filename);
|
|
if (t->pair == NULL) {
|
|
release_thread_data(t);
|
|
return;
|
|
}
|
|
|
|
// TODO Currently only one-thread-per-file mode is implemented.
|
|
|
|
if (create_thread(&single, t)) {
|
|
io_close(t->pair, false);
|
|
release_thread_data(t);
|
|
}
|
|
|
|
return;
|
|
}
|