diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 922839810..448a3e73f 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1087,7 +1087,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi * `output` : `pos` will be updated with amount of data flushed . * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush . * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */ -static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush) +static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsigned blockToFlush, ZSTD_EndDirective end) { unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask; DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u)", blockToFlush); @@ -1116,7 +1116,7 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, uns if ( job.jobCompleted && job.frameChecksumNeeded ) { U32 const checksum = (U32)XXH64_digest(&zcs->xxhState); - DEBUGLOG(5, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum); + DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum); MEM_writeLE32((char*)job.dstBuff.start + job.cSize, checksum); job.cSize += 4; zcs->jobs[wJobID].cSize += 4; @@ -1150,9 +1150,10 @@ static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, uns if (job.srcSize > job.consumed) return 1; /* current job not completely compressed */ } if (zcs->doneJobID < zcs->nextJobID) return 1; /* some more jobs to flush */ - if (zcs->jobReady) return 1; /* at least one more job to do ! */ + if (zcs->jobReady) return 1; /* one job is ready and queued! */ if (zcs->inBuff.filled > 0) return 1; /* input not empty */ zcs->allJobsCompleted = zcs->frameEnded; /* last frame entirely flushed */ + if (end == ZSTD_e_end) return !zcs->frameEnded; /* for ZSTD_e_end, question becomes : is frame completed ? It may need a last null-block */ return 0; /* everything flushed */ } @@ -1231,7 +1232,7 @@ size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx, } /* check for potential compressed data ready to be flushed */ - { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress); /* block if there was no forward input progress */ + { size_t const remainingToFlush = ZSTDMT_flushProduced(mtctx, output, !forwardInputProgress, endOp); /* block if there was no forward input progress */ if (input->pos < input->size) return MAX(remainingToFlush, 1); /* input not consumed : do not flush yet */ return remainingToFlush; } @@ -1247,21 +1248,21 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu } -static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, unsigned endFrame) +static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_EndDirective endFrame) { size_t const srcSize = mtctx->inBuff.filled - mtctx->prefixSize; DEBUGLOG(5, "ZSTDMT_flushStream_internal"); if ( mtctx->jobReady /* one job ready for a worker to pick up */ || (srcSize > 0) /* still some data within input buffer */ - || (endFrame && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */ + || ((endFrame==ZSTD_e_end) && !mtctx->frameEnded)) { /* need a last 0-size block to end frame */ DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)", - (U32)srcSize, endFrame); + (U32)srcSize, (U32)endFrame); CHECK_F( ZSTDMT_createCompressionJob(mtctx, srcSize, endFrame) ); } /* check if there is any data available to flush */ - return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */); + return ZSTDMT_flushProduced(mtctx, output, 1 /* blockToFlush */, endFrame); } @@ -1270,7 +1271,7 @@ size_t ZSTDMT_flushStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) DEBUGLOG(5, "ZSTDMT_flushStream"); if (mtctx->singleBlockingThread) return ZSTD_flushStream(mtctx->cctxPool->cctx[0], output); - return ZSTDMT_flushStream_internal(mtctx, output, 0 /* endFrame */); + return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_flush); } size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) @@ -1278,5 +1279,5 @@ size_t ZSTDMT_endStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output) DEBUGLOG(4, "ZSTDMT_endStream"); if (mtctx->singleBlockingThread) return ZSTD_endStream(mtctx->cctxPool->cctx[0], output); - return ZSTDMT_flushStream_internal(mtctx, output, 1 /* endFrame */); + return ZSTDMT_flushStream_internal(mtctx, output, ZSTD_e_end); } diff --git a/lib/zstd.h b/lib/zstd.h index aab6be3b0..da9d295cd 100644 --- a/lib/zstd.h +++ b/lib/zstd.h @@ -1136,10 +1136,11 @@ typedef enum { * and then immediately returns, just indicating that there is some data remaining to be flushed. * The function nonetheless guarantees forward progress : it will return only after it reads or write at least 1+ byte. * - Exception : in multi-threading mode, if the first call requests a ZSTD_e_end directive, it is blocking : it will complete compression before giving back control to caller. - * - @return provides the minimum amount of data remaining to be flushed from internal buffers + * - @return provides a minimum amount of data remaining to be flushed from internal buffers * or an error code, which can be tested using ZSTD_isError(). * if @return != 0, flush is not fully completed, there is still some data left within internal buffers. - * This is useful to determine if a ZSTD_e_flush or ZSTD_e_end directive is completed. + * This is useful for ZSTD_e_flush, since in this case more flushes are necessary to empty all buffers. + * For ZSTD_e_end, @return == 0 when internal buffers are fully flushed and frame is completed. * - after a ZSTD_e_end directive, if internal buffer is not fully flushed (@return != 0), * only ZSTD_e_end or ZSTD_e_flush operations are allowed. * Before starting a new compression job, or changing compression parameters, diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 3a738c2fc..762224d53 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -1695,8 +1695,9 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double size_t const randomDstSize = FUZ_randomLength(&lseed, maxSampleLog+1); size_t const adjustedDstSize = MIN(cBufferSize - cSize, randomDstSize); outBuff.size = outBuff.pos + adjustedDstSize; - DISPLAYLEVEL(6, "End-flush into dst buffer of size %u \n", (U32)adjustedDstSize); + DISPLAYLEVEL(6, "t%u: End-flush into dst buffer of size %u \n", testNb, (U32)adjustedDstSize); remainingToFlush = ZSTD_compress_generic(zc, &outBuff, &inBuff, ZSTD_e_end); + DISPLAYLEVEL(6, "t%u: Total flushed so far : %u bytes \n", testNb, (U32)outBuff.pos); CHECK( ZSTD_isError(remainingToFlush), "ZSTD_compress_generic w/ ZSTD_e_end error : %s", ZSTD_getErrorName(remainingToFlush) );