diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index e04864507..22560bfb1 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -310,8 +310,8 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) typedef struct { size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */ - ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) workers */ - ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) workers */ + ZSTD_pthread_mutex_t job_mutex; /* Thread-safe - used by mtctx and worker */ + ZSTD_pthread_cond_t job_cond; /* Thread-safe - used by mtctx and worker */ ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ @@ -395,13 +395,13 @@ void ZSTDMT_compressChunk(void* jobDescription) ip += blockSize; op += cSize; assert(op < oend); /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); /* note : it's a mtctx mutex */ + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); /* note : it's a mtctx mutex */ job->cSize += cSize; job->consumed = blockSize * blockNb; DEBUGLOG(5, "ZSTDMT_compressChunk: compress new block : cSize==%u bytes (total: %u)", (U32)cSize, (U32)job->cSize); - ZSTD_pthread_cond_signal(job->mtctx_cond); /* warns some more data is ready to be flushed */ - ZSTD_pthread_mutex_unlock(job->mtctx_mutex); + ZSTD_pthread_cond_signal(&job->job_cond); /* warns some more data is ready to be flushed */ + ZSTD_pthread_mutex_unlock(&job->job_mutex); } /* last block */ assert(blockSize > 0); assert((blockSize & (blockSize - 1)) == 0); /* blockSize must be power of 2 for mask==(blockSize-1) to work */ @@ -413,9 +413,9 @@ void ZSTDMT_compressChunk(void* jobDescription) ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; } /* stats */ - ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); job->cSize += cSize; - ZSTD_pthread_mutex_unlock(job->mtctx_mutex); + ZSTD_pthread_mutex_unlock(&job->job_mutex); } } _endJob: @@ -424,10 +424,10 @@ _endJob: ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff); job->srcBuff = g_nullBuffer; job->prefixStart = NULL; /* report */ - ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); job->consumed = job->srcSize; - ZSTD_pthread_cond_signal(job->mtctx_cond); - ZSTD_pthread_mutex_unlock(job->mtctx_mutex); + ZSTD_pthread_cond_signal(&job->job_cond); + ZSTD_pthread_mutex_unlock(&job->job_mutex); } @@ -447,8 +447,6 @@ struct ZSTDMT_CCtx_s { ZSTDMT_jobDescription* jobs; ZSTDMT_bufferPool* bufPool; ZSTDMT_CCtxPool* cctxPool; - ZSTD_pthread_mutex_t mtctx_mutex; - ZSTD_pthread_cond_t mtctx_cond; ZSTD_CCtx_params params; size_t targetSectionSize; size_t targetPrefixSize; @@ -470,16 +468,34 @@ struct ZSTDMT_CCtx_s { }; /* ZSTDMT_allocJobsTable() - * allocate, and just init to zero, a job table. + * allocate and init a job table. * update *nbJobsPtr to next power of 2 value, as size of table * No reverse free() function is provided : just use ZSTD_free() */ -static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) +static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem) { U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1; U32 const nbJobs = 1 << nbJobsLog2; - *nbJobsPtr = nbJobs; - return (ZSTDMT_jobDescription*) ZSTD_calloc( + ZSTDMT_jobDescription* const jobTable = ZSTD_calloc( nbJobs * sizeof(ZSTDMT_jobDescription), cMem); + U32 jobNb; + if (jobTable==NULL) return NULL; + *nbJobsPtr = nbJobs; + for (jobNb=0; jobNbcMem = cMem; mtctx->allJobsCompleted = 1; mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem); - mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, cMem); + mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem); assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem); @@ -521,14 +537,6 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) ZSTDMT_freeCCtx(mtctx); return NULL; } - if (ZSTD_pthread_mutex_init(&mtctx->mtctx_mutex, NULL)) { - ZSTDMT_freeCCtx(mtctx); - return NULL; - } - if (ZSTD_pthread_cond_init(&mtctx->mtctx_cond, NULL)) { - ZSTDMT_freeCCtx(mtctx); - return NULL; - } DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads); return mtctx; } @@ -566,12 +574,12 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted"); while (mtctx->doneJobID < mtctx->nextJobID) { unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) { DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ - ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); + ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } - ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); + ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); mtctx->doneJobID++; } } @@ -581,12 +589,10 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) if (mtctx==NULL) return 0; /* compatible with free on NULL */ POOL_free(mtctx->factory); /* stop and free worker threads */ ZSTDMT_releaseAllJobResources(mtctx); /* release job resources into pools first */ - ZSTD_free(mtctx->jobs, mtctx->cMem); + ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); ZSTDMT_freeBufferPool(mtctx->bufPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTD_freeCDict(mtctx->cdictLocal); - ZSTD_pthread_mutex_destroy(&mtctx->mtctx_mutex); - ZSTD_pthread_cond_destroy(&mtctx->mtctx_cond); ZSTD_free(mtctx, mtctx->cMem); return 0; } @@ -671,7 +677,6 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { ZSTD_frameProgression fps; DEBUGLOG(6, "ZSTDMT_getFrameProgression"); - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); fps.consumed = mtctx->consumed; fps.produced = mtctx->produced; assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize); @@ -682,14 +687,16 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) mtctx->doneJobID, lastJobNb, mtctx->jobReady) for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) { unsigned const wJobID = jobNb & mtctx->jobIDMask; - size_t const cResult = mtctx->jobs[wJobID].cSize; - size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; - fps.consumed += mtctx->jobs[wJobID].consumed; - fps.ingested += mtctx->jobs[wJobID].srcSize; - fps.produced += produced; + ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex); + { size_t const cResult = mtctx->jobs[wJobID].cSize; + size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; + fps.consumed += mtctx->jobs[wJobID].consumed; + fps.ingested += mtctx->jobs[wJobID].srcSize; + fps.produced += produced; + } + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); } } - ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); return fps; } @@ -749,9 +756,9 @@ static size_t ZSTDMT_compress_advanced_internal( if (nbChunks > mtctx->jobIDMask+1) { /* enlarge job table */ U32 nbJobs = nbChunks; - ZSTD_free(mtctx->jobs, mtctx->cMem); + ZSTDMT_freeJobsTable(mtctx->jobs, mtctx->jobIDMask+1, mtctx->cMem); mtctx->jobIDMask = 0; - mtctx->jobs = ZSTDMT_allocJobsTable(&nbJobs, mtctx->cMem); + mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, mtctx->cMem); if (mtctx->jobs==NULL) return ERROR(memory_allocation); assert((nbJobs != 0) && ((nbJobs & (nbJobs - 1)) == 0)); /* ensure nbJobs is a power of 2 */ mtctx->jobIDMask = nbJobs - 1; @@ -781,8 +788,6 @@ static size_t ZSTDMT_compress_advanced_internal( mtctx->jobs[u].bufPool = mtctx->bufPool; mtctx->jobs[u].firstChunk = (u==0); mtctx->jobs[u].lastChunk = (u==nbChunks-1); - mtctx->jobs[u].mtctx_mutex = &mtctx->mtctx_mutex; - mtctx->jobs[u].mtctx_cond = &mtctx->mtctx_cond; if (params.fParams.checksumFlag) { XXH64_update(&xxh64, srcStart + frameStartPos, chunkSize); @@ -802,12 +807,12 @@ static size_t ZSTDMT_compress_advanced_internal( unsigned chunkID; for (chunkID=0; chunkIDmtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[chunkID].job_mutex); while (mtctx->jobs[chunkID].consumed < mtctx->jobs[chunkID].srcSize) { DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID); - ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); + ZSTD_pthread_cond_wait(&mtctx->jobs[chunkID].job_cond, &mtctx->jobs[chunkID].job_mutex); } - ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); + ZSTD_pthread_mutex_unlock(&mtctx->jobs[chunkID].job_mutex); DEBUGLOG(5, "ready to write chunk %u ", chunkID); mtctx->jobs[chunkID].prefixStart = NULL; @@ -1058,8 +1063,6 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].lastChunk = endFrame; mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag; mtctx->jobs[jobID].dstFlushed = 0; - mtctx->jobs[jobID].mtctx_mutex = &mtctx->mtctx_mutex; - mtctx->jobs[jobID].mtctx_cond = &mtctx->mtctx_cond; if (mtctx->params.fParams.checksumFlag) XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize); @@ -1128,7 +1131,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u blockToFlush, mtctx->doneJobID, mtctx->nextJobID); assert(output->size >= output->pos); - ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->mtctx_mutex); + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); if ( blockToFlush && (mtctx->doneJobID < mtctx->nextJobID) ) { assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize); @@ -1140,14 +1143,14 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u } DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); - ZSTD_pthread_cond_wait(&mtctx->mtctx_cond, &mtctx->mtctx_mutex); /* block when nothing available to flush but more to come */ + ZSTD_pthread_cond_wait(&mtctx->jobs[wJobID].job_cond, &mtctx->jobs[wJobID].job_mutex); /* block when nothing to flush but some to come */ } } /* try to flush something */ { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ size_t const srcSize = mtctx->jobs[wJobID].srcSize; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ - ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex); + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); if (ZSTD_isError(cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", mtctx->doneJobID, ZSTD_getErrorName(cSize)); diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 16dcba73a..ecc477c71 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -969,9 +969,9 @@ static int fuzzerTests(U32 seed, U32 nbTests, unsigned startTest, double compres FUZ_rand(&coreSeed); lseed = coreSeed ^ prime32; if (nbTests >= testNb) { - DISPLAYUPDATE(2, "\r%6u/%6u (%08X) ", testNb, nbTests, lseed); + DISPLAYUPDATE(2, "\r%6u/%6u ", testNb, nbTests); } else { - DISPLAYUPDATE(2, "\r%6u (%08X) ", testNb, lseed); + DISPLAYUPDATE(2, "\r%6u ", testNb); } /* states full reset (deliberately not synchronized) */