diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index a5b6e0143..4b2369007 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -309,6 +309,13 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx) /* ===== Worker thread ===== */ /* ------------------------------------------ */ +typedef struct { + void const* start; + size_t size; +} range_t; + +static const range_t kNullRange = { NULL, 0 }; + typedef struct { size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */ size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */ @@ -317,10 +324,8 @@ typedef struct { ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) workers */ ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) workers */ buffer_t dstBuff; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */ - buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */ - const void* prefixStart; /* set by mtctx, then read and set0 by worker => no barrier */ - size_t prefixSize; /* set by mtctx, then read by worker => no barrier */ - size_t srcSize; /* set by mtctx, then read by worker & mtctx => no barrier */ + range_t prefix; /* set by mtctx, then read by worker & mtctx => no barrier */ + range_t src; /* set by mtctx, then read by worker & mtctx => no barrier */ unsigned firstJob; /* set by mtctx, then read by worker => no barrier */ unsigned lastJob; /* set by mtctx, then read by worker => no barrier */ ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */ @@ -335,7 +340,6 @@ void ZSTDMT_compressionJob(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool); - const void* const src = (const char*)job->prefixStart + job->prefixSize; buffer_t dstBuff = job->dstBuff; /* ressources */ @@ -358,7 +362,7 @@ void ZSTDMT_compressionJob(void* jobDescription) assert(job->firstJob); /* only allowed for first job */ if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } } else { /* srcStart points at reloaded section */ - U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->srcSize; + U64 const pledgedSrcSize = job->firstJob ? job->fullFrameSize : job->src.size; ZSTD_CCtx_params jobParams = job->params; /* do not modify job->params ! copy it, modify the copy */ { size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstJob); if (ZSTD_isError(forceWindowError)) { @@ -366,7 +370,7 @@ void ZSTDMT_compressionJob(void* jobDescription) goto _endJob; } } { size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, - job->prefixStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ + job->prefix.start, job->prefix.size, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */ NULL, /*cdict*/ jobParams, pledgedSrcSize); if (ZSTD_isError(initError)) { @@ -374,7 +378,7 @@ void ZSTDMT_compressionJob(void* jobDescription) goto _endJob; } } } if (!job->firstJob) { /* flush and overwrite frame header when it's not first job */ - size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, src, 0); + size_t const hSize = ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.capacity, job->src.start, 0); if (ZSTD_isError(hSize)) { job->cSize = hSize; /* save error code */ goto _endJob; } DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32)hSize); ZSTD_invalidateRepCodes(cctx); @@ -382,14 +386,14 @@ void ZSTDMT_compressionJob(void* jobDescription) /* compress */ { size_t const chunkSize = 4*ZSTD_BLOCKSIZE_MAX; - int const nbChunks = (int)((job->srcSize + (chunkSize-1)) / chunkSize); - const BYTE* ip = (const BYTE*) src; + int const nbChunks = (int)((job->src.size + (chunkSize-1)) / chunkSize); + const BYTE* ip = (const BYTE*) job->src.start; BYTE* const ostart = (BYTE*)dstBuff.start; BYTE* op = ostart; BYTE* oend = op + dstBuff.capacity; int chunkNb; - if (sizeof(size_t) > sizeof(int)) assert(job->srcSize < ((size_t)INT_MAX) * chunkSize); /* check overflow */ - DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->srcSize, nbChunks); + if (sizeof(size_t) > sizeof(int)) assert(job->src.size < ((size_t)INT_MAX) * chunkSize); /* check overflow */ + DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32)job->src.size, nbChunks); assert(job->cSize == 0); for (chunkNb = 1; chunkNb < nbChunks; chunkNb++) { size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, chunkSize); @@ -408,8 +412,8 @@ void ZSTDMT_compressionJob(void* jobDescription) /* last block */ assert(chunkSize > 0); assert((chunkSize & (chunkSize - 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */ if ((nbChunks > 0) | job->lastJob /*must output a "last block" flag*/ ) { - size_t const lastBlockSize1 = job->srcSize & (chunkSize-1); - size_t const lastBlockSize = ((lastBlockSize1==0) & (job->srcSize>=chunkSize)) ? chunkSize : lastBlockSize1; + size_t const lastBlockSize1 = job->src.size & (chunkSize-1); + size_t const lastBlockSize = ((lastBlockSize1==0) & (job->src.size>=chunkSize)) ? chunkSize : lastBlockSize1; size_t const cSize = (job->lastJob) ? ZSTD_compressEnd (cctx, op, oend-op, ip, lastBlockSize) : ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize); @@ -421,13 +425,14 @@ void ZSTDMT_compressionJob(void* jobDescription) } } _endJob: + if (job->prefix.size > 0) + DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job->prefix.start); + DEBUGLOG(5, "Finished with source: %zx", (size_t)job->src.start); /* release resources */ ZSTDMT_releaseCCtx(job->cctxPool, cctx); - ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff); - job->srcBuff = g_nullBuffer; job->prefixStart = NULL; /* report */ ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); - job->consumed = job->srcSize; + job->consumed = job->src.size; ZSTD_pthread_cond_signal(&job->job_cond); ZSTD_pthread_mutex_unlock(&job->job_mutex); } @@ -438,12 +443,27 @@ _endJob: /* ------------------------------------------ */ typedef struct { + range_t prefix; /* read-only non-owned prefix buffer */ buffer_t buffer; - size_t targetCapacity; /* note : buffers provided by the pool may be larger than target capacity */ - size_t prefixSize; size_t filled; } inBuff_t; +typedef struct { + BYTE* buffer; /* The round input buffer. All jobs get references + * to pieces of the buffer. ZSTDMT_tryGetInputRange() + * handles handing out job input buffers, and makes + * sure it doesn't overlap with any pieces still in use. + */ + size_t capacity; /* The capacity of buffer. */ + size_t pos; /* The position of the current inBuff in the round + * buffer. Updated past the end if the inBuff once + * the inBuff is sent to the worker thread. + * pos <= capacity. + */ +} roundBuff_t; + +static const roundBuff_t kNullRoundBuff = {NULL, 0, 0}; + struct ZSTDMT_CCtx_s { POOL_ctx* factory; ZSTDMT_jobDescription* jobs; @@ -452,6 +472,7 @@ struct ZSTDMT_CCtx_s { ZSTD_CCtx_params params; size_t targetSectionSize; size_t targetPrefixSize; + roundBuff_t roundBuff; inBuff_t inBuff; int jobReady; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create another one. */ XXH64_state_t xxhState; @@ -538,6 +559,7 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem) mtctx->jobIDMask = nbJobs - 1; mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem); + mtctx->roundBuff = kNullRoundBuff; if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) { ZSTDMT_freeCCtx(mtctx); return NULL; @@ -563,14 +585,10 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx) ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff); mtctx->jobs[jobID].dstBuff = g_nullBuffer; mtctx->jobs[jobID].cSize = 0; - DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start); - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff); - mtctx->jobs[jobID].srcBuff = g_nullBuffer; } memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription)); - DEBUGLOG(4, "input: release address %08X", (U32)(size_t)mtctx->inBuff.buffer.start); - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer); mtctx->inBuff.buffer = g_nullBuffer; + mtctx->inBuff.filled = 0; mtctx->allJobsCompleted = 1; } @@ -580,7 +598,7 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* mtctx) while (mtctx->doneJobID < mtctx->nextJobID) { unsigned const jobID = mtctx->doneJobID & mtctx->jobIDMask; ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[jobID].job_mutex); - while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) { + while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { DEBUGLOG(5, "waiting for jobCompleted signal from job %u", mtctx->doneJobID); /* we want to block when waiting for data to flush */ ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } @@ -598,6 +616,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx) ZSTDMT_freeBufferPool(mtctx->bufPool); ZSTDMT_freeCCtxPool(mtctx->cctxPool); ZSTD_freeCDict(mtctx->cdictLocal); + if (mtctx->roundBuff.buffer) + ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem); ZSTD_free(mtctx, mtctx->cMem); return 0; } @@ -610,7 +630,8 @@ size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx) + ZSTDMT_sizeof_bufferPool(mtctx->bufPool) + (mtctx->jobIDMask+1) * sizeof(ZSTDMT_jobDescription) + ZSTDMT_sizeof_CCtxPool(mtctx->cctxPool) - + ZSTD_sizeof_CDict(mtctx->cdictLocal); + + ZSTD_sizeof_CDict(mtctx->cdictLocal) + + mtctx->roundBuff.capacity; } /* Internal only */ @@ -701,8 +722,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) DEBUGLOG(6, "ZSTDMT_getFrameProgression"); fps.consumed = mtctx->consumed; fps.produced = mtctx->produced; - assert(mtctx->inBuff.filled >= mtctx->inBuff.prefixSize); - fps.ingested = mtctx->consumed + (mtctx->inBuff.filled - mtctx->inBuff.prefixSize); + fps.ingested = mtctx->consumed + mtctx->inBuff.filled; { unsigned jobNb; unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1); DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)", @@ -713,7 +733,7 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx) { size_t const cResult = mtctx->jobs[wJobID].cSize; size_t const produced = ZSTD_isError(cResult) ? 0 : cResult; fps.consumed += mtctx->jobs[wJobID].consumed; - fps.ingested += mtctx->jobs[wJobID].srcSize; + fps.ingested += mtctx->jobs[wJobID].src.size; fps.produced += produced; } ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); @@ -795,10 +815,10 @@ static size_t ZSTDMT_compress_advanced_internal( buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer; size_t dictSize = u ? overlapSize : 0; - mtctx->jobs[u].srcBuff = g_nullBuffer; - mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize; - mtctx->jobs[u].prefixSize = dictSize; - mtctx->jobs[u].srcSize = jobSize; assert(jobSize > 0); /* avoid job.srcSize == 0 */ + mtctx->jobs[u].prefix.start = srcStart + frameStartPos - dictSize; + mtctx->jobs[u].prefix.size = dictSize; + mtctx->jobs[u].src.start = srcStart + frameStartPos; + mtctx->jobs[u].src.size = jobSize; assert(jobSize > 0); /* avoid job.src.size == 0 */ mtctx->jobs[u].consumed = 0; mtctx->jobs[u].cSize = 0; mtctx->jobs[u].cdict = (u==0) ? cdict : NULL; @@ -817,7 +837,7 @@ static size_t ZSTDMT_compress_advanced_internal( } DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)jobSize); - DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12); + DEBUG_PRINTHEX(6, mtctx->jobs[u].prefix.start, 12); POOL_add(mtctx->factory, ZSTDMT_compressionJob, &mtctx->jobs[u]); frameStartPos += jobSize; @@ -831,14 +851,13 @@ static size_t ZSTDMT_compress_advanced_internal( for (jobID=0; jobIDjobs[jobID].job_mutex); - while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].srcSize) { + while (mtctx->jobs[jobID].consumed < mtctx->jobs[jobID].src.size) { DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID); ZSTD_pthread_cond_wait(&mtctx->jobs[jobID].job_cond, &mtctx->jobs[jobID].job_mutex); } ZSTD_pthread_mutex_unlock(&mtctx->jobs[jobID].job_mutex); DEBUGLOG(5, "ready to write job %u ", jobID); - mtctx->jobs[jobID].prefixStart = NULL; { size_t const cSize = mtctx->jobs[jobID].cSize; if (ZSTD_isError(cSize)) error = cSize; if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall); @@ -967,11 +986,30 @@ size_t ZSTDMT_initCStream_internal( if (mtctx->targetSectionSize < ZSTDMT_JOBSIZE_MIN) mtctx->targetSectionSize = ZSTDMT_JOBSIZE_MIN; if (mtctx->targetSectionSize < mtctx->targetPrefixSize) mtctx->targetSectionSize = mtctx->targetPrefixSize; /* job size must be >= overlap size */ DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32)(mtctx->targetSectionSize>>10), params.jobSize); - mtctx->inBuff.targetCapacity = mtctx->targetPrefixSize + mtctx->targetSectionSize; - DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->inBuff.targetCapacity>>10)); - ZSTDMT_setBufferSize(mtctx->bufPool, MAX(mtctx->inBuff.targetCapacity, ZSTD_compressBound(mtctx->targetSectionSize)) ); + DEBUGLOG(4, "inBuff Size : %u KB", (U32)(mtctx->targetSectionSize>>10)); + ZSTDMT_setBufferSize(mtctx->bufPool, ZSTD_compressBound(mtctx->targetSectionSize)); + { + /* Two buffers of slack, plus extra space for the overlap */ + size_t const nbWorkers = MAX(mtctx->params.nbWorkers, 1); + size_t const nbSlackBuffers = MIN(nbWorkers, 2) + (mtctx->targetPrefixSize > 0); + size_t const nbSections = nbWorkers + nbSlackBuffers; + size_t const capacity = mtctx->targetSectionSize * nbSections; + if (mtctx->roundBuff.capacity < capacity) { + if (mtctx->roundBuff.buffer) + ZSTD_free(mtctx->roundBuff.buffer, mtctx->cMem); + mtctx->roundBuff.buffer = (BYTE*)ZSTD_malloc(capacity, mtctx->cMem); + if (mtctx->roundBuff.buffer == NULL) { + mtctx->roundBuff.capacity = 0; + return ERROR(memory_allocation); + } + mtctx->roundBuff.capacity = capacity; + } + } + DEBUGLOG(4, "roundBuff capacity : %u KB", (U32)(mtctx->roundBuff.capacity>>10)); + mtctx->roundBuff.pos = 0; mtctx->inBuff.buffer = g_nullBuffer; - mtctx->inBuff.prefixSize = 0; + mtctx->inBuff.filled = 0; + mtctx->inBuff.prefix = kNullRange; mtctx->doneJobID = 0; mtctx->nextJobID = 0; mtctx->frameEnded = 0; @@ -1038,17 +1076,16 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) { static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job) { assert(job->lastJob == 1); - assert(job->srcSize == 0); /* last job is empty -> will be simplified into a last empty block */ + assert(job->src.size == 0); /* last job is empty -> will be simplified into a last empty block */ assert(job->firstJob == 0); /* cannot be first job, as it also needs to create frame header */ - /* A job created by streaming variant starts with a src buffer, but no dst buffer. - * It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx. - * When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx. - * This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */ assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */ - assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */ - assert(job->srcBuff.capacity >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */ - job->dstBuff = job->srcBuff; - job->srcBuff = g_nullBuffer; + job->dstBuff = ZSTDMT_getBuffer(job->bufPool); + if (job->dstBuff.start == NULL) { + job->cSize = ERROR(memory_allocation); + return; + } + assert(job->dstBuff.capacity >= ZSTD_blockHeaderSize); /* no buffer should ever be that small */ + job->src = kNullRange; job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.capacity); assert(!ZSTD_isError(job->cSize)); assert(job->consumed == 0); @@ -1066,14 +1103,13 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS } if (!mtctx->jobReady) { + BYTE const* src = (BYTE const*)mtctx->inBuff.buffer.start; DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ", - mtctx->nextJobID, (U32)srcSize, (U32)mtctx->inBuff.prefixSize); - assert(mtctx->jobs[jobID].srcBuff.start == NULL); /* no buffer left : supposed already released */ - mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer; - mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start; - mtctx->jobs[jobID].prefixSize = mtctx->inBuff.prefixSize; - mtctx->jobs[jobID].srcSize = srcSize; - assert(mtctx->inBuff.filled >= srcSize + mtctx->inBuff.prefixSize); + mtctx->nextJobID, (U32)srcSize, (U32)mtctx->inBuff.prefix.size); + mtctx->jobs[jobID].src.start = src; + mtctx->jobs[jobID].src.size = srcSize; + assert(mtctx->inBuff.filled >= srcSize); + mtctx->jobs[jobID].prefix = mtctx->inBuff.prefix; mtctx->jobs[jobID].consumed = 0; mtctx->jobs[jobID].cSize = 0; mtctx->jobs[jobID].params = mtctx->params; @@ -1089,29 +1125,20 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS mtctx->jobs[jobID].frameChecksumNeeded = endFrame && (mtctx->nextJobID>0) && mtctx->params.fParams.checksumFlag; mtctx->jobs[jobID].dstFlushed = 0; - if (mtctx->params.fParams.checksumFlag) - XXH64_update(&mtctx->xxhState, (const char*)mtctx->inBuff.buffer.start + mtctx->inBuff.prefixSize, srcSize); + if (mtctx->params.fParams.checksumFlag && srcSize > 0) + XXH64_update(&mtctx->xxhState, src, srcSize); - /* get a new buffer for next input */ + /* Update the round buffer pos and clear the input buffer to be reset */ + mtctx->roundBuff.pos += srcSize; + mtctx->inBuff.buffer = g_nullBuffer; + mtctx->inBuff.filled = 0; + /* Set the prefix */ if (!endFrame) { - size_t const newPrefixSize = MIN(mtctx->inBuff.filled, mtctx->targetPrefixSize); - mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); - if (mtctx->inBuff.buffer.start == NULL) { /* not enough memory to allocate a new input buffer */ - mtctx->jobs[jobID].srcSize = mtctx->jobs[jobID].consumed = 0; - mtctx->nextJobID++; - ZSTDMT_waitForAllJobsCompleted(mtctx); - ZSTDMT_releaseAllJobResources(mtctx); - return ERROR(memory_allocation); - } - mtctx->inBuff.filled -= (mtctx->inBuff.prefixSize + srcSize) - newPrefixSize; - memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */ - (const char*)mtctx->jobs[jobID].prefixStart + mtctx->inBuff.prefixSize + srcSize - newPrefixSize, - mtctx->inBuff.filled); - mtctx->inBuff.prefixSize = newPrefixSize; + size_t const newPrefixSize = MIN(srcSize, mtctx->targetPrefixSize); + mtctx->inBuff.prefix.start = src + srcSize - newPrefixSize; + mtctx->inBuff.prefix.size = newPrefixSize; } else { /* endFrame==1 => no need for another input buffer */ - mtctx->inBuff.buffer = g_nullBuffer; - mtctx->inBuff.filled = 0; - mtctx->inBuff.prefixSize = 0; + mtctx->inBuff.prefix = kNullRange; mtctx->frameEnded = endFrame; if (mtctx->nextJobID == 0) { /* single job exception : checksum is already calculated directly within worker thread */ @@ -1130,7 +1157,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))", mtctx->nextJobID, - (U32)mtctx->jobs[jobID].srcSize, + (U32)mtctx->jobs[jobID].src.size, mtctx->jobs[jobID].lastJob, mtctx->nextJobID, jobID); @@ -1161,9 +1188,9 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u && (mtctx->doneJobID < mtctx->nextJobID) ) { assert(mtctx->jobs[wJobID].dstFlushed <= mtctx->jobs[wJobID].cSize); while (mtctx->jobs[wJobID].dstFlushed == mtctx->jobs[wJobID].cSize) { /* nothing to flush */ - if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].srcSize) { + if (mtctx->jobs[wJobID].consumed == mtctx->jobs[wJobID].src.size) { DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond, there will be none", - mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].srcSize); + mtctx->doneJobID, (U32)mtctx->jobs[wJobID].consumed, (U32)mtctx->jobs[wJobID].src.size); break; } DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)", @@ -1174,7 +1201,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u /* try to flush something */ { size_t cSize = mtctx->jobs[wJobID].cSize; /* shared */ size_t const srcConsumed = mtctx->jobs[wJobID].consumed; /* shared */ - size_t const srcSize = mtctx->jobs[wJobID].srcSize; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ + size_t const srcSize = mtctx->jobs[wJobID].src.size; /* read-only, could be done after mutex lock, but no-declaration-after-statement */ ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); if (ZSTD_isError(cSize)) { DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s", @@ -1211,7 +1238,6 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u && (mtctx->jobs[wJobID].dstFlushed == cSize) ) { /* output buffer fully flushed => free this job position */ DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one", mtctx->doneJobID, (U32)mtctx->jobs[wJobID].dstFlushed); - assert(mtctx->jobs[wJobID].srcBuff.start == NULL); /* srcBuff supposed already released */ ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[wJobID].dstBuff); mtctx->jobs[wJobID].dstBuff = g_nullBuffer; mtctx->jobs[wJobID].cSize = 0; /* ensure this job slot is considered "not started" in future check */ @@ -1232,6 +1258,109 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, u return 0; /* internal buffers fully flushed */ } +/** + * Returns the range of data used by the earliest job that is not yet complete. + * If the data of the first job is broken up into two segments, we cover both + * sections. + */ +static range_t ZSTDMT_getInputDataInUse(ZSTDMT_CCtx* mtctx) +{ + unsigned const firstJobID = mtctx->doneJobID; + unsigned const lastJobID = mtctx->nextJobID; + unsigned jobID; + + for (jobID = firstJobID; jobID < lastJobID; ++jobID) { + unsigned const wJobID = jobID & mtctx->jobIDMask; + size_t consumed; + + ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobs[wJobID].job_mutex); + consumed = mtctx->jobs[wJobID].consumed; + ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex); + + if (consumed < mtctx->jobs[wJobID].src.size) { + range_t range = mtctx->jobs[wJobID].prefix; + if (range.size == 0) { + /* Empty prefix */ + range = mtctx->jobs[wJobID].src; + } + /* Job source in multiple segments not supported yet */ + assert(range.start <= mtctx->jobs[wJobID].src.start); + return range; + } + } + return kNullRange; +} + +/** + * Returns non-zero iff buffer and range overlap. + */ +static int ZSTDMT_isOverlapped(buffer_t buffer, range_t range) +{ + BYTE const* const bufferStart = (BYTE const*)buffer.start; + BYTE const* const bufferEnd = bufferStart + buffer.capacity; + BYTE const* const rangeStart = (BYTE const*)range.start; + BYTE const* const rangeEnd = rangeStart + range.size; + + if (rangeStart == NULL || bufferStart == NULL) + return 0; + + return bufferStart < rangeEnd && rangeStart < bufferEnd; +} + +/** + * Attempts to set the inBuff to the next section to fill. + * If any part of the new section is still in use we give up. + * Returns non-zero if the buffer is filled. + */ +static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx* mtctx) +{ + range_t const inUse = ZSTDMT_getInputDataInUse(mtctx); + size_t const spaceLeft = mtctx->roundBuff.capacity - mtctx->roundBuff.pos; + size_t const target = mtctx->targetSectionSize; + buffer_t buffer; + + assert(mtctx->inBuff.buffer.start == NULL); + assert(mtctx->roundBuff.capacity >= target); + + if (spaceLeft < target) { + /* ZSTD_invalidateRepCodes() doesn't work for extDict variants. + * Simply copy the prefix to the beginning in that case. + */ + BYTE* const start = (BYTE*)mtctx->roundBuff.buffer; + size_t const prefixSize = mtctx->inBuff.prefix.size; + + buffer.start = start; + buffer.capacity = prefixSize; + if (ZSTDMT_isOverlapped(buffer, inUse)) { + DEBUGLOG(6, "Waiting for buffer..."); + return 0; + } + memmove(start, mtctx->inBuff.prefix.start, prefixSize); + mtctx->inBuff.prefix.start = start; + mtctx->roundBuff.pos = prefixSize; + } + buffer.start = mtctx->roundBuff.buffer + mtctx->roundBuff.pos; + buffer.capacity = target; + + if (ZSTDMT_isOverlapped(buffer, inUse)) { + DEBUGLOG(6, "Waiting for buffer..."); + return 0; + } + assert(!ZSTDMT_isOverlapped(buffer, mtctx->inBuff.prefix)); + DEBUGLOG(5, "Using prefix range [%zx, %zx)", + (size_t)mtctx->inBuff.prefix.start, + (size_t)mtctx->inBuff.prefix.start + mtctx->inBuff.prefix.size); + DEBUGLOG(5, "Using source range [%zx, %zx)", + (size_t)buffer.start, + (size_t)buffer.start + buffer.capacity); + + + mtctx->inBuff.buffer = buffer; + mtctx->inBuff.filled = 0; + assert(mtctx->roundBuff.pos + buffer.capacity <= mtctx->roundBuff.capacity); + return 1; +} + /** ZSTDMT_compressStream_generic() : * internal use only - exposed to be invoked from zstd_compress.c @@ -1242,7 +1371,6 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, ZSTD_inBuffer* input, ZSTD_EndDirective endOp) { - size_t const newJobThreshold = mtctx->inBuff.prefixSize + mtctx->targetSectionSize; unsigned forwardInputProgress = 0; DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)", (U32)endOp, (U32)(input->size - input->pos)); @@ -1271,7 +1399,6 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, if (ZSTD_isError(cSize)) return cSize; input->pos = input->size; output->pos += cSize; - ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->inBuff.buffer); /* was allocated in initStream */ mtctx->allJobsCompleted = 1; mtctx->frameEnded = 1; return 0; @@ -1281,18 +1408,19 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, if ( (!mtctx->jobReady) && (input->size > input->pos) ) { /* support NULL input */ if (mtctx->inBuff.buffer.start == NULL) { - mtctx->inBuff.buffer = ZSTDMT_getBuffer(mtctx->bufPool); /* note : allocation can fail, in which case, buffer.start==NULL */ - mtctx->inBuff.filled = 0; - if ( (mtctx->inBuff.buffer.start == NULL) /* allocation failure */ - && (mtctx->doneJobID == mtctx->nextJobID) ) { /* and nothing to flush */ - return ERROR(memory_allocation); /* no forward progress possible => output an error */ + assert(mtctx->inBuff.filled == 0); /* Can't fill an empty buffer */ + if (!ZSTDMT_tryGetInputRange(mtctx)) { + /* It is only possible for this operation to fail if there are + * still compression jobs ongoing. + */ + assert(mtctx->doneJobID != mtctx->nextJobID); } - assert(mtctx->inBuff.buffer.capacity >= mtctx->inBuff.targetCapacity); /* pool must provide a buffer >= targetCapacity */ } - if (mtctx->inBuff.buffer.start != NULL) { /* no buffer for input, but it's possible to flush, and then reclaim the buffer */ - size_t const toLoad = MIN(input->size - input->pos, mtctx->inBuff.targetCapacity - mtctx->inBuff.filled); + if (mtctx->inBuff.buffer.start != NULL) { + size_t const toLoad = MIN(input->size - input->pos, mtctx->targetSectionSize - mtctx->inBuff.filled); + assert(mtctx->inBuff.buffer.capacity >= mtctx->targetSectionSize); DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u", - (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->inBuff.targetCapacity); + (U32)toLoad, (U32)mtctx->inBuff.filled, (U32)mtctx->targetSectionSize); memcpy((char*)mtctx->inBuff.buffer.start + mtctx->inBuff.filled, (const char*)input->src + input->pos, toLoad); input->pos += toLoad; mtctx->inBuff.filled += toLoad; @@ -1303,10 +1431,11 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } if ( (mtctx->jobReady) - || (mtctx->inBuff.filled >= newJobThreshold) /* filled enough : let's compress */ + || (mtctx->inBuff.filled >= mtctx->targetSectionSize) /* filled enough : let's compress */ || ((endOp != ZSTD_e_continue) && (mtctx->inBuff.filled > 0)) /* something to flush : let's go */ || ((endOp == ZSTD_e_end) && (!mtctx->frameEnded)) ) { /* must finish the frame with a zero-size block */ - size_t const jobSize = MIN(mtctx->inBuff.filled - mtctx->inBuff.prefixSize, mtctx->targetSectionSize); + size_t const jobSize = mtctx->inBuff.filled; + assert(mtctx->inBuff.filled <= mtctx->targetSectionSize); CHECK_F( ZSTDMT_createCompressionJob(mtctx, jobSize, endOp) ); } @@ -1323,13 +1452,13 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_in CHECK_F( ZSTDMT_compressStream_generic(mtctx, output, input, ZSTD_e_continue) ); /* recommended next input size : fill current input buffer */ - return mtctx->inBuff.targetCapacity - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ + return mtctx->targetSectionSize - mtctx->inBuff.filled; /* note : could be zero when input buffer is fully filled and no more availability to create new job */ } static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame) { - size_t const srcSize = mtctx->inBuff.filled - mtctx->inBuff.prefixSize; + size_t const srcSize = mtctx->inBuff.filled; DEBUGLOG(5, "ZSTDMT_flushStream_internal"); if ( mtctx->jobReady /* one job ready for a worker to pick up */ diff --git a/tests/playTests.sh b/tests/playTests.sh index c170ba9f4..1847f57d7 100755 --- a/tests/playTests.sh +++ b/tests/playTests.sh @@ -213,8 +213,8 @@ roundTripTest -g512K roundTripTest -g512K " --zstd=slen=3,tlen=48,strat=6" roundTripTest -g512K " --zstd=strat=6,wlog=23,clog=23,hlog=22,slog=6" roundTripTest -g512K " --zstd=windowLog=23,chainLog=23,hashLog=22,searchLog=6,searchLength=3,targetLength=48,strategy=6" -roundTripTest -g512K " --long --zstd=ldmHashLog=20,ldmSearchLength=64,ldmBucketSizeLog=1,ldmHashEveryLog=7" -roundTripTest -g512K " --long --zstd=ldmhlog=20,ldmslen=64,ldmblog=1,ldmhevery=7" +roundTripTest -g512K " --single-thread --long --zstd=ldmHashLog=20,ldmSearchLength=64,ldmBucketSizeLog=1,ldmHashEveryLog=7" +roundTripTest -g512K " --single-thread --long --zstd=ldmhlog=20,ldmslen=64,ldmblog=1,ldmhevery=7" roundTripTest -g512K 19 @@ -630,12 +630,12 @@ roundTripTest -g516K 19 # btopt fileRoundTripTest -g500K $ECHO "\n===> zstd long distance matching round-trip tests " -roundTripTest -g0 "2 --long" -roundTripTest -g1000K "1 --long" -roundTripTest -g517K "6 --long" -roundTripTest -g516K "16 --long" -roundTripTest -g518K "19 --long" -fileRoundTripTest -g5M "3 --long" +roundTripTest -g0 "2 --single-thread --long" +roundTripTest -g1000K "1 --single-thread --long" +roundTripTest -g517K "6 --single-thread --long" +roundTripTest -g516K "16 --single-thread --long" +roundTripTest -g518K "19 --single-thread --long" +fileRoundTripTest -g5M "3 --single-thread --long" roundTripTest -g96K "5 --single-thread" @@ -648,7 +648,7 @@ then fileRoundTripTest -g4M "19 -T2 -B1M" $ECHO "\n===> zstdmt long distance matching round-trip tests " - roundTripTest -g8M "3 --long -T2" + roundTripTest -g8M "3 --long=24 -T2" else $ECHO "\n===> no multithreading, skipping zstdmt tests " fi @@ -699,13 +699,13 @@ rm tmp* $ECHO "\n===> zstd long distance matching tests " -roundTripTest -g0 " --long" -roundTripTest -g9M "2 --long" +roundTripTest -g0 " --single-thread --long" +roundTripTest -g9M "2 --single-thread --long" # Test parameter parsing -roundTripTest -g1M -P50 "1 --long=29" " --memory=512MB" -roundTripTest -g1M -P50 "1 --long=29 --zstd=wlog=28" " --memory=256MB" -roundTripTest -g1M -P50 "1 --long=29" " --long=28 --memory=512MB" -roundTripTest -g1M -P50 "1 --long=29" " --zstd=wlog=28 --memory=512MB" +roundTripTest -g1M -P50 "1 --single-thread --long=29" " --memory=512MB" +roundTripTest -g1M -P50 "1 --single-thread --long=29 --zstd=wlog=28" " --memory=256MB" +roundTripTest -g1M -P50 "1 --single-thread --long=29" " --long=28 --memory=512MB" +roundTripTest -g1M -P50 "1 --single-thread --long=29" " --zstd=wlog=28 --memory=512MB" if [ "$1" != "--test-large-data" ]; then @@ -746,13 +746,13 @@ fileRoundTripTest -g4193M -P99 1 $ECHO "\n===> zstd long, long distance matching round-trip tests " -roundTripTest -g270000000 "1 --long" -roundTripTest -g130000000 -P60 "5 --long" -roundTripTest -g35000000 -P70 "8 --long" -roundTripTest -g18000001 -P80 "18 --long" +roundTripTest -g270000000 "1 --single-thread --long" +roundTripTest -g130000000 -P60 "5 --single-thread --long" +roundTripTest -g35000000 -P70 "8 --single-thread --long" +roundTripTest -g18000001 -P80 "18 --single-thread --long" # Test large window logs -roundTripTest -g700M -P50 "1 --long=29" -roundTripTest -g600M -P50 "1 --long --zstd=wlog=29,clog=28" +roundTripTest -g700M -P50 "1 --single-thread --long=29" +roundTripTest -g600M -P50 "1 --single-thread --long --zstd=wlog=29,clog=28" if [ -n "$hasMT" ]