diff --git a/lib/common/pool.c b/lib/common/pool.c index e38881949..4ec1dfffb 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -46,8 +46,8 @@ struct POOL_ctx_s { Waits for jobs and executes them. @returns : NULL on failure else non-null. */ -static void *POOL_thread(void *opaque) { - POOL_ctx *ctx = (POOL_ctx *)opaque; +static void* POOL_thread(void* opaque) { + POOL_ctx* const ctx = (POOL_ctx*)opaque; if (!ctx) { return NULL; } for (;;) { /* Lock the mutex and wait for a non-empty queue or until shutdown */ @@ -61,7 +61,7 @@ static void *POOL_thread(void *opaque) { return opaque; } /* Pop a job off the queue */ - { POOL_job job = ctx->queue[ctx->queueHead]; + { POOL_job const job = ctx->queue[ctx->queueHead]; ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; /* Unlock the mutex, signal a pusher, and run the job */ if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; } diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index a6a497285..1b925914a 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1,5 +1,6 @@ #include /* malloc */ -#include /* posix only, to be replaced by a more portable version */ +#include /* threadpool */ +#include /* mutex */ #include "zstd_internal.h" /* MIN, ERROR */ #include "zstdmt_compress.h" @@ -43,176 +44,11 @@ if (g_debugLevel>=MUTEX_WAIT_TIME_DLEVEL) { \ #define ZSTDMT_NBTHREADS_MAX 128 #define ZSTDMT_NBSTACKEDFRAMES_MAX (2*ZSTDMT_NBTHREADS_MAX) -typedef struct frameToWrite_s { - const void* start; - size_t frameSize; - unsigned frameID; - unsigned isLastFrame; -} frameToWrite_t; - -typedef struct ZSTDMT_dstBuffer_s { - ZSTD_outBuffer out; - unsigned frameIDToWrite; - pthread_mutex_t frameTable_mutex; - pthread_mutex_t allFramesWritten_mutex; - frameToWrite_t stackedFrame[ZSTDMT_NBSTACKEDFRAMES_MAX]; - unsigned nbStackedFrames; -} ZSTDMT_dstBufferManager; - -static ZSTDMT_dstBufferManager ZSTDMT_createDstBufferManager(void* dst, size_t dstCapacity) -{ - ZSTDMT_dstBufferManager dbm; - dbm.out.dst = dst; - dbm.out.size = dstCapacity; - dbm.out.pos = 0; - dbm.frameIDToWrite = 0; - pthread_mutex_init(&dbm.frameTable_mutex, NULL); - pthread_mutex_t* const allFramesWritten_mutex = &dbm.allFramesWritten_mutex; - pthread_mutex_init(allFramesWritten_mutex, NULL); - PTHREAD_MUTEX_LOCK(allFramesWritten_mutex); /* maybe could be merged into init ? */ - dbm.nbStackedFrames = 0; - return dbm; -} - -/* note : can fail if nbStackedFrames > ZSTDMT_NBSTACKEDFRAMES_MAX. - * note2 : can only be called from a section with frameTable_mutex already locked */ -static void ZSTDMT_stackFrameToWrite(ZSTDMT_dstBufferManager* dstBufferManager, frameToWrite_t frame) { - dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames++] = frame; -} - - typedef struct buffer_s { void* start; - size_t bufferSize; + size_t size; } buffer_t; -static buffer_t ZSTDMT_getDstBuffer(const ZSTDMT_dstBufferManager* dstBufferManager) -{ - ZSTD_outBuffer const out = dstBufferManager->out; - buffer_t buf; - buf.start = (char*)(out.dst) + out.pos; - buf.bufferSize = out.size - out.pos; - return buf; -} - -/* condition : stackNumber < dstBufferManager->nbStackedFrames. - * note : there can only be one write at a time, due to frameID condition */ -static size_t ZSTDMT_writeFrame(ZSTDMT_dstBufferManager* dstBufferManager, unsigned stackNumber) -{ - ZSTD_outBuffer const out = dstBufferManager->out; - size_t const frameSize = dstBufferManager->stackedFrame[stackNumber].frameSize; - const void* const frameStart = dstBufferManager->stackedFrame[stackNumber].start; - if (out.pos + frameSize > out.size) - return ERROR(dstSize_tooSmall); - DEBUGLOG(3, "writing frame %u (%u bytes) ", dstBufferManager->stackedFrame[stackNumber].frameID, (U32)frameSize); - memcpy((char*)out.dst + out.pos, frameStart, frameSize); - dstBufferManager->out.pos += frameSize; - dstBufferManager->frameIDToWrite = dstBufferManager->stackedFrame[stackNumber].frameID + 1; - return 0; -} - -static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager, - const void* src, size_t srcSize, - unsigned frameID, unsigned isLastFrame) -{ - unsigned lastFrameWritten = 0; - - /* check if correct frame ordering; stack otherwise */ - DEBUGLOG(5, "considering writing frame %u ", frameID); - PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex); - if (frameID != dstBufferManager->frameIDToWrite) { - DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u ", frameID, dstBufferManager->frameIDToWrite); - frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame }; - ZSTDMT_stackFrameToWrite(dstBufferManager, frame); - pthread_mutex_unlock(&dstBufferManager->frameTable_mutex); - return 0; - } - pthread_mutex_unlock(&dstBufferManager->frameTable_mutex); - - /* write frame - * note : only one write possible due to frameID condition */ - DEBUGLOG(3, "writing frame %u (%u bytes) ", frameID, (U32)srcSize); - ZSTD_outBuffer const out = dstBufferManager->out; - if (out.pos + srcSize > out.size) - return ERROR(dstSize_tooSmall); - if (frameID) /* frameID==0 compress directly in dst buffer */ - memcpy((char*)out.dst + out.pos, src, srcSize); - dstBufferManager->out.pos += srcSize; - dstBufferManager->frameIDToWrite = frameID+1; - lastFrameWritten = isLastFrame; - - /* check if more frames are stacked */ - PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex); - unsigned frameWritten = dstBufferManager->nbStackedFrames>0; - while (frameWritten) { - unsigned u; - frameID++; - frameWritten = 0; - for (u=0; unbStackedFrames; u++) { - if (dstBufferManager->stackedFrame[u].frameID == frameID) { - pthread_mutex_unlock(&dstBufferManager->frameTable_mutex); - DEBUGLOG(4, "catch up frame %u ", frameID); - { size_t const writeError = ZSTDMT_writeFrame(dstBufferManager, u); - if (ZSTD_isError(writeError)) return writeError; } - lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame; - dstBufferManager->frameIDToWrite = frameID+1; - /* remove frame from stack */ - PTHREAD_MUTEX_LOCK(&dstBufferManager->frameTable_mutex); - dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1]; - dstBufferManager->nbStackedFrames -= 1; - frameWritten = dstBufferManager->nbStackedFrames>0; - break; - } } } - pthread_mutex_unlock(&dstBufferManager->frameTable_mutex); - - /* end reached : last frame written */ - if (lastFrameWritten) pthread_mutex_unlock(&dstBufferManager->allFramesWritten_mutex); - return 0; -} - - - -typedef struct ZSTDMT_jobDescription_s { - const void* src; /* NULL means : kill thread */ - size_t srcSize; - int compressionLevel; - ZSTDMT_dstBufferManager* dstManager; - unsigned frameNumber; - unsigned isLastFrame; -} ZSTDMT_jobDescription; - -typedef struct ZSTDMT_jobAgency_s { - pthread_mutex_t jobAnnounce_mutex; - pthread_mutex_t jobApply_mutex; - ZSTDMT_jobDescription jobAnnounce; -} ZSTDMT_jobAgency; - -/* ZSTDMT_postjob() : - * This function is blocking as long as previous posted job is not taken. - * It could be made non-blocking, with a storage queue. - * But blocking has benefits : on top of memory savings, - * the caller will be able to measure delay, allowing dynamic speed throttle (via compression level). - */ -static void ZSTDMT_postjob(ZSTDMT_jobAgency* jobAgency, ZSTDMT_jobDescription job) -{ - DEBUGLOG(5, "starting job posting "); - PTHREAD_MUTEX_LOCK(&jobAgency->jobApply_mutex); /* wait for a thread to take previous job */ - DEBUGLOG(5, "job posting mutex acquired "); - jobAgency->jobAnnounce = job; /* post job */ - pthread_mutex_unlock(&jobAgency->jobAnnounce_mutex); /* announce */ - DEBUGLOG(5, "job available now "); -} - -static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency) -{ - PTHREAD_MUTEX_LOCK(&jobAgency->jobAnnounce_mutex); /* should check return code */ - ZSTDMT_jobDescription const job = jobAgency->jobAnnounce; - pthread_mutex_unlock(&jobAgency->jobApply_mutex); - return job; -} - - - #define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX typedef struct ZSTDMT_bufferPool_s { pthread_mutex_t bufferPool_mutex; @@ -227,7 +63,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) pool->nbBuffers--; buffer_t const buf = pool->bTable[pool->nbBuffers]; pthread_mutex_unlock(&pool->bufferPool_mutex); - size_t const availBufferSize = buf.bufferSize; + size_t const availBufferSize = buf.size; if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */ return buf; free(buf.start); /* size conditions not respected : create a new buffer */ @@ -235,7 +71,7 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize) pthread_mutex_unlock(&pool->bufferPool_mutex); /* create new buffer */ buffer_t buf; - buf.bufferSize = bSize; + buf.size = bSize; buf.start = calloc(1, bSize); return buf; } @@ -255,79 +91,119 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf) -struct ZSTDMT_CCtx_s { - pthread_t pthread[ZSTDMT_NBTHREADS_MAX]; - unsigned nbThreads; - ZSTDMT_jobAgency jobAgency; - ZSTDMT_bufferPool bufferPool; -}; +typedef struct { + ZSTD_CCtx* cctx; + const void* srcStart; + size_t srcSize; + buffer_t dstBuff; + int compressionLevel; + unsigned frameID; + size_t cSize; + unsigned jobCompleted; + pthread_mutex_t* jobCompleted_mutex; +} ZSTDMT_jobDescription; -static void* ZSTDMT_compressionThread(void* arg) +/* ZSTDMT_compressFrame() : POOL_function type */ +void ZSTDMT_compressFrame(void* jobDescription) { - if (arg==NULL) return NULL; /* error : should not be possible */ - ZSTDMT_CCtx* const mtctx = (ZSTDMT_CCtx*) arg; - ZSTDMT_jobAgency* const jobAgency = &mtctx->jobAgency; - ZSTDMT_bufferPool* const pool = &mtctx->bufferPool; - ZSTD_CCtx* const cctx = ZSTD_createCCtx(); - if (cctx==NULL) return NULL; /* allocation failure : thread not started */ - DEBUGLOG(3, "thread %li created ", (long int)pthread_self()); - for (;;) { - ZSTDMT_jobDescription const job = ZSTDMT_getjob(jobAgency); - if (job.src == NULL) { - DEBUGLOG(4, "thread exit "); - ZSTD_freeCCtx(cctx); - return NULL; - } - ZSTDMT_dstBufferManager* dstBufferManager = job.dstManager; - size_t const dstBufferCapacity = ZSTD_compressBound(job.srcSize); - DEBUGLOG(4, "requesting a dstBuffer for frame %u", job.frameNumber); - buffer_t const dstBuffer = job.frameNumber ? ZSTDMT_getBuffer(pool, dstBufferCapacity) : ZSTDMT_getDstBuffer(dstBufferManager); /* lack params */ - DEBUGLOG(4, "start compressing frame %u", job.frameNumber); - //size_t const cSize = ZSTD_compress(dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel); - size_t const cSize = ZSTD_compressCCtx(cctx, dstBuffer.start, dstBuffer.bufferSize, job.src, job.srcSize, job.compressionLevel); - if (ZSTD_isError(cSize)) return (void*)(cSize); /* error - find a better way */ - size_t const writeError = ZSTDMT_tryWriteFrame(dstBufferManager, dstBuffer.start, cSize, job.frameNumber, job.isLastFrame); /* pas clair */ - if (ZSTD_isError(writeError)) return (void*)writeError; - if (job.frameNumber) ZSTDMT_releaseBuffer(pool, dstBuffer); - } + DEBUGLOG(5, "Entering ZSTDMT_compressFrame() "); + ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; + DEBUGLOG(5, "compressing %u bytes with ZSTD_compressCCtx : ", (unsigned)job->srcSize); + job->cSize = ZSTD_compressCCtx(job->cctx, job->dstBuff.start, job->dstBuff.size, job->srcStart, job->srcSize, job->compressionLevel); + DEBUGLOG(5, "compressed to %u bytes ", (unsigned)job->cSize); + job->jobCompleted = 1; + DEBUGLOG(5, "unlocking mutex jobCompleted_mutex"); + pthread_mutex_unlock(job->jobCompleted_mutex); + DEBUGLOG(5, "ZSTDMT_compressFrame completed"); } + +/* note : calls to CCtxPool only from main thread */ + +typedef struct { + unsigned totalCCtx; + unsigned availCCtx; + ZSTD_CCtx* cctx[1]; /* variable size */ +} ZSTDMT_CCtxPool; + +static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads) +{ + ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) calloc(1, sizeof(ZSTDMT_CCtxPool) + nbThreads*sizeof(ZSTD_CCtx*)); + if (!cctxPool) return NULL; + { unsigned u; + for (u=0; ucctx[u] = ZSTD_createCCtx(); /* check for NULL result ! */ + } + cctxPool->totalCCtx = cctxPool->availCCtx = nbThreads; + return cctxPool; +} + +static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* pool) +{ + if (pool->availCCtx) { + pool->availCCtx--; + return pool->cctx[pool->availCCtx]; + } + /* should not be possible, since totalCCtx==nbThreads */ + return ZSTD_createCCtx(); +} + +static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) +{ + if (pool->availCCtx < pool->totalCCtx) + pool->cctx[pool->availCCtx++] = cctx; + else + /* should not be possible, since totalCCtx==nbThreads */ + ZSTD_freeCCtx(cctx); +} + +static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool) +{ + unsigned u; + for (u=0; utotalCCtx; u++) + ZSTD_freeCCtx(pool->cctx[u]); + free(pool); +} + + +struct ZSTDMT_CCtx_s { + POOL_ctx* factory; + ZSTDMT_bufferPool buffPool; + ZSTDMT_CCtxPool* cctxPool; + unsigned nbThreads; + pthread_mutex_t jobCompleted_mutex; + ZSTDMT_jobDescription jobs[1]; /* variable size */ +}; + ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads) { if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL; - ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx)); + ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx) + nbThreads*sizeof(ZSTDMT_jobDescription)); if (!cctx) return NULL; - /* init jobAgency */ - pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL); /* check return value ? */ - pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL); - PTHREAD_MUTEX_LOCK(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */ - /* init bufferPool */ - pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL); - /* start all workers */ cctx->nbThreads = nbThreads; - DEBUGLOG(2, "nbThreads : %u \n", nbThreads); - unsigned t; - for (t = 0; t < nbThreads; t++) { - pthread_create(&cctx->pthread[t], NULL, ZSTDMT_compressionThread, cctx); /* check return value ? */ - } + cctx->factory = POOL_create(nbThreads, 1); + pthread_mutex_init(&cctx->buffPool.bufferPool_mutex, NULL); + cctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads); + pthread_mutex_init(&cctx->jobCompleted_mutex, NULL); return cctx; } -size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* cctx) +size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) /* incompleted ! */ { - /* free threads */ - /* free mutex (if necessary) */ + POOL_free(mtctx->factory); + /* free mutexes (if necessary) */ /* free bufferPool */ - free(cctx); /* incompleted ! */ + ZSTDMT_freeCCtxPool(mtctx->cctxPool); + free(mtctx); return 0; } + size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize, int compressionLevel) { - ZSTDMT_jobAgency* jobAgency = &mtctx->jobAgency; ZSTD_parameters const params = ZSTD_getParams(compressionLevel, srcSize, 0); size_t const frameSizeTarget = (size_t)1 << (params.cParams.windowLog + 2); unsigned const nbFramesMax = (unsigned)(srcSize / frameSizeTarget) + (srcSize < frameSizeTarget) /* min 1 */; @@ -336,7 +212,7 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, size_t remainingSrcSize = srcSize; const char* const srcStart = (const char*)src; size_t frameStartPos = 0; - ZSTDMT_dstBufferManager dbm = ZSTDMT_createDstBufferManager(dst, dstCapacity); + DEBUGLOG(2, "windowLog : %u => frameSizeTarget : %u ", params.cParams.windowLog, (U32)frameSizeTarget); DEBUGLOG(2, "nbFrames : %u (size : %u bytes) ", nbFrames, (U32)avgFrameSize); @@ -344,15 +220,46 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, { unsigned u; for (u=0; ubuffPool, dstBufferCapacity) : (buffer_t){ dst, dstCapacity }; + ZSTD_CCtx* cctx = ZSTDMT_getCCtx(mtctx->cctxPool); + + mtctx->jobs[u].srcStart = srcStart + frameStartPos; + mtctx->jobs[u].srcSize = frameSize; + mtctx->jobs[u].compressionLevel = compressionLevel; + mtctx->jobs[u].dstBuff = dstBuffer; + mtctx->jobs[u].cctx = cctx; + mtctx->jobs[u].frameID = u; + mtctx->jobs[u].jobCompleted = 0; + mtctx->jobs[u].jobCompleted_mutex = &mtctx->jobCompleted_mutex; + DEBUGLOG(3, "posting job %u (%u bytes)", u, (U32)frameSize); - ZSTDMT_jobDescription const job = { srcStart+frameStartPos, frameSize, compressionLevel, - &dbm, u, u==(nbFrames-1) }; - ZSTDMT_postjob(jobAgency, job); + POOL_add(mtctx->factory, ZSTDMT_compressFrame, &mtctx->jobs[u]); + frameStartPos += frameSize; remainingSrcSize -= frameSize; } } + /* note : since nbFrames <= nbThreads, all jobs should be running immediately in parallel */ + + { unsigned frameID; + size_t dstPos = 0; + for (frameID=0; frameIDjobs[frameID].jobCompleted==0) { + DEBUGLOG(4, "waiting for signal jobCompleted_mutex") + pthread_mutex_lock(&mtctx->jobCompleted_mutex); + } + { size_t const cSize = mtctx->jobs[frameID].cSize; + if (ZSTD_isError(cSize)) return cSize; + if (dstPos + cSize > dstCapacity) return ERROR(dstSize_tooSmall); + if (frameID) memcpy((char*)dst + dstPos, mtctx->jobs[frameID].dstBuff.start, mtctx->jobs[frameID].cSize); + dstPos += cSize ; + } + ZSTDMT_releaseCCtx(mtctx->cctxPool, mtctx->jobs[frameID].cctx); + ZSTDMT_releaseBuffer(&mtctx->buffPool, mtctx->jobs[frameID].dstBuff); + } + DEBUGLOG(4, "compressed size : %u ", (U32)dstPos); + return dstPos; + } - PTHREAD_MUTEX_LOCK(&dbm.allFramesWritten_mutex); - DEBUGLOG(4, "compressed size : %u ", (U32)dbm.out.pos); - return dbm.out.pos; }