mirror of
https://github.com/facebook/zstd.git
synced 2025-10-06 00:04:13 -04:00
removed calculation of file size and replaced with limited number of available jobs
This commit is contained in:
parent
dd8a591d5d
commit
a2680e5b96
@ -1,5 +1,6 @@
|
||||
#define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
|
||||
#define FILE_CHUNK_SIZE 4 << 20
|
||||
#define MAX_NUM_JOBS 30;
|
||||
typedef unsigned char BYTE;
|
||||
|
||||
#include <stdio.h> /* fprintf */
|
||||
@ -22,10 +23,13 @@ typedef struct {
|
||||
unsigned jobID;
|
||||
unsigned jobCompleted;
|
||||
unsigned jobReady;
|
||||
unsigned jobWritten;
|
||||
pthread_mutex_t* jobCompleted_mutex;
|
||||
pthread_cond_t* jobCompleted_cond;
|
||||
pthread_mutex_t* jobReady_mutex;
|
||||
pthread_cond_t* jobReady_cond;
|
||||
pthread_mutex_t* jobWrite_mutex;
|
||||
pthread_cond_t* jobWrite_cond;
|
||||
size_t compressedSize;
|
||||
} jobDescription;
|
||||
|
||||
@ -43,6 +47,8 @@ typedef struct {
|
||||
pthread_cond_t jobReady_cond;
|
||||
pthread_mutex_t allJobsCompleted_mutex;
|
||||
pthread_cond_t allJobsCompleted_cond;
|
||||
pthread_mutex_t jobWrite_mutex;
|
||||
pthread_cond_t jobWrite_cond;
|
||||
jobDescription* jobs;
|
||||
FILE* dstFile;
|
||||
} adaptCCtx;
|
||||
@ -66,12 +72,14 @@ static int freeCCtx(adaptCCtx* ctx)
|
||||
int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond);
|
||||
int const allJobsMutexError = pthread_mutex_destroy(&ctx->allJobsCompleted_mutex);
|
||||
int const allJobsCondError = pthread_cond_destroy(&ctx->allJobsCompleted_cond);
|
||||
int const jobWriteMutexError = pthread_mutex_destroy(&ctx->jobWrite_mutex);
|
||||
int const jobWriteCondError = pthread_cond_destroy(&ctx->jobWrite_cond);
|
||||
int const fileCloseError = ctx->dstFile != NULL ? fclose(ctx->dstFile) : 0;
|
||||
if (ctx->jobs){
|
||||
freeCompressionJobs(ctx);
|
||||
free(ctx->jobs);
|
||||
}
|
||||
return completedMutexError | completedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError;
|
||||
return completedMutexError | completedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError | jobWriteMutexError | jobWriteCondError;
|
||||
}
|
||||
}
|
||||
|
||||
@ -91,6 +99,8 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
|
||||
pthread_cond_init(&ctx->jobReady_cond, NULL);
|
||||
pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL);
|
||||
pthread_cond_init(&ctx->allJobsCompleted_cond, NULL);
|
||||
pthread_mutex_init(&ctx->jobWrite_mutex, NULL);
|
||||
pthread_cond_init(&ctx->jobWrite_cond, NULL);
|
||||
ctx->numJobs = numJobs;
|
||||
ctx->lastJobID = -1; /* intentional underflow */
|
||||
ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
|
||||
@ -101,6 +111,9 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
|
||||
ctx->jobs[u].jobCompleted_cond = &ctx->jobCompleted_cond;
|
||||
ctx->jobs[u].jobReady_mutex = &ctx->jobReady_mutex;
|
||||
ctx->jobs[u].jobReady_cond = &ctx->jobReady_cond;
|
||||
ctx->jobs[u].jobWrite_mutex = &ctx->jobWrite_mutex;
|
||||
ctx->jobs[u].jobWrite_cond = &ctx->jobWrite_cond;
|
||||
ctx->jobs[u].jobWritten = 1;
|
||||
}
|
||||
}
|
||||
ctx->nextJobID = 0;
|
||||
@ -139,7 +152,8 @@ static void* compressionThread(void* arg)
|
||||
adaptCCtx* ctx = (adaptCCtx*)arg;
|
||||
unsigned currJob = 0;
|
||||
for ( ; ; ) {
|
||||
jobDescription* job = &ctx->jobs[currJob];
|
||||
unsigned const currJobIndex = currJob % ctx->numJobs;
|
||||
jobDescription* job = &ctx->jobs[currJobIndex];
|
||||
pthread_mutex_lock(job->jobReady_mutex);
|
||||
while(job->jobReady == 0) {
|
||||
pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex);
|
||||
@ -174,7 +188,8 @@ static void* outputThread(void* arg)
|
||||
|
||||
unsigned currJob = 0;
|
||||
for ( ; ; ) {
|
||||
jobDescription* job = &ctx->jobs[currJob];
|
||||
unsigned const currJobIndex = currJob % ctx->numJobs;
|
||||
jobDescription* job = &ctx->jobs[currJobIndex];
|
||||
pthread_mutex_lock(job->jobCompleted_mutex);
|
||||
while (job->jobCompleted == 0) {
|
||||
pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex);
|
||||
@ -187,7 +202,7 @@ static void* outputThread(void* arg)
|
||||
return arg;
|
||||
}
|
||||
{
|
||||
size_t const writeSize = fwrite(ctx->jobs[currJob].dst.start, 1, compressedSize, ctx->dstFile);
|
||||
size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, ctx->dstFile);
|
||||
if (writeSize != compressedSize) {
|
||||
DISPLAY("Error: an error occurred during file write operation\n");
|
||||
return arg;
|
||||
@ -195,6 +210,10 @@ static void* outputThread(void* arg)
|
||||
}
|
||||
}
|
||||
currJob++;
|
||||
pthread_mutex_lock(job->jobWrite_mutex);
|
||||
job->jobWritten = 1;
|
||||
pthread_cond_signal(job->jobWrite_cond);
|
||||
pthread_mutex_unlock(job->jobWrite_mutex);
|
||||
if (currJob >= ctx->lastJobID || ctx->threadError) {
|
||||
/* finished with all jobs */
|
||||
pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
|
||||
@ -232,13 +251,20 @@ static size_t getFileSize(const char* const filename)
|
||||
static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
|
||||
{
|
||||
unsigned const nextJob = ctx->nextJobID;
|
||||
jobDescription* job = &ctx->jobs[nextJob];
|
||||
unsigned const nextJobIndex = nextJob % ctx->numJobs;
|
||||
jobDescription* job = &ctx->jobs[nextJobIndex];
|
||||
pthread_mutex_lock(job->jobWrite_mutex);
|
||||
while (job->jobWritten == 0) {
|
||||
pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex);
|
||||
}
|
||||
pthread_mutex_unlock(job->jobWrite_mutex);
|
||||
job->compressionLevel = ctx->compressionLevel;
|
||||
job->src.start = malloc(srcSize);
|
||||
job->src.size = srcSize;
|
||||
job->dst.size = ZSTD_compressBound(srcSize);
|
||||
job->dst.start = malloc(job->dst.size);
|
||||
job->jobCompleted = 0;
|
||||
job->jobWritten = 0;
|
||||
job->jobCompleted_cond = &ctx->jobCompleted_cond;
|
||||
job->jobCompleted_mutex = &ctx->jobCompleted_mutex;
|
||||
job->jobReady_cond = &ctx->jobReady_cond;
|
||||
@ -265,8 +291,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst
|
||||
BYTE* const src = malloc(FILE_CHUNK_SIZE);
|
||||
FILE* const srcFile = fopen(srcFilename, "rb");
|
||||
size_t fileSize = getFileSize(srcFilename);
|
||||
size_t const numJobsPrelim = (fileSize / ((size_t)FILE_CHUNK_SIZE));
|
||||
size_t const numJobs = (numJobsPrelim * FILE_CHUNK_SIZE) == fileSize ? numJobsPrelim : numJobsPrelim + 1;
|
||||
size_t const numJobs = MAX_NUM_JOBS;
|
||||
int ret = 0;
|
||||
adaptCCtx* ctx = NULL;
|
||||
|
||||
|
@ -1,6 +1,39 @@
|
||||
make clean multi
|
||||
|
||||
./multi tests/test2048.pdf tmp.zst
|
||||
zstd -d tmp.zst
|
||||
diff tmp tests/test2048.pdf
|
||||
echo "diff test complete"
|
||||
rm tmp*
|
||||
|
||||
./multi tests/test512.pdf tmp.zst
|
||||
zstd -d tmp.zst
|
||||
diff tmp tests/test512.pdf
|
||||
echo "diff test complete"
|
||||
rm tmp*
|
||||
|
||||
./multi tests/test64.pdf tmp.zst
|
||||
zstd -d tmp.zst
|
||||
diff tmp tests/test64.pdf
|
||||
echo "diff test complete"
|
||||
rm tmp*
|
||||
|
||||
./multi tests/test16.pdf tmp.zst
|
||||
zstd -d tmp.zst
|
||||
diff tmp tests/test16.pdf
|
||||
echo "diff test complete"
|
||||
rm tmp*
|
||||
|
||||
./multi tests/test4.pdf tmp.zst
|
||||
zstd -d tmp.zst
|
||||
diff tmp tests/test4.pdf
|
||||
echo "diff test complete"
|
||||
rm tmp*
|
||||
|
||||
./multi tests/test.pdf tmp.zst
|
||||
zstd -d tmp.zst
|
||||
diff tmp tests/test.pdf
|
||||
echo "diff test complete"
|
||||
rm tmp*
|
||||
|
||||
make clean
|
||||
|
Loading…
x
Reference in New Issue
Block a user