Merge pull request #1010 from facebook/flexibleLevel

Updatable compression parameters
This commit is contained in:
Yann Collet 2018-02-10 14:19:54 -08:00 committed by GitHub
commit 992c2370f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 424 additions and 316 deletions

View File

@ -416,7 +416,7 @@ size_t ZSTD_estimateDCtxSize(void);
It will also consider src size to be arbitrarily "large", which is worst case. It will also consider src size to be arbitrarily "large", which is worst case.
If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation. If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation.
ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is > 1. ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
Note : CCtx size estimation is only correct for single-threaded compression. Note : CCtx size estimation is only correct for single-threaded compression.
</p></pre><BR> </p></pre><BR>
@ -429,7 +429,7 @@ size_t ZSTD_estimateDStreamSize_fromFrame(const void* src, size_t srcSize);
It will also consider src size to be arbitrarily "large", which is worst case. It will also consider src size to be arbitrarily "large", which is worst case.
If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation. If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation.
ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is set to a value > 1. ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
Note : CStream size estimation is only correct for single-threaded compression. Note : CStream size estimation is only correct for single-threaded compression.
ZSTD_DStream memory budget depends on window Size. ZSTD_DStream memory budget depends on window Size.
This information can be passed manually, using ZSTD_estimateDStreamSize, This information can be passed manually, using ZSTD_estimateDStreamSize,
@ -800,18 +800,13 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
</b>/* multi-threading parameters */<b> </b>/* multi-threading parameters */<b>
</b>/* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).<b> </b>/* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).<b>
* They return an error otherwise. */ * They return an error otherwise. */
ZSTD_p_nbThreads=400, </b>/* Select how many threads a compression job can spawn (default:1)<b> ZSTD_p_nbWorkers=400, </b>/* Select how many threads will be spawned to compress in parallel.<b>
* More threads improve speed, but also increase memory usage. * When nbWorkers >= 1, triggers asynchronous mode :
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * ZSTD_compress_generic() consumes some input, flush some output if possible, and immediately gives back control to caller,
* Special: value 0 means "do not change nbThreads" */ * while compression work is performed in parallel, within worker threads.
ZSTD_p_nonBlockingMode, </b>/* Single thread mode is by default "blocking" :<b> * (note : a strong exception to this rule is when first invocation sets ZSTD_e_end : it becomes a blocking call).
* it finishes its job as much as possible, and only then gives back control to caller. * More workers improve speed, but also increase memory usage.
* In contrast, multi-thread is by default "non-blocking" : * Default value is `0`, aka "single-threaded mode" : no worker is spawned, compression is performed inside Caller's thread, all invocations are blocking */
* it takes some input, flush some output if available, and immediately gives back control to caller.
* Compression work is performed in parallel, within worker threads.
* (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
* Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
* It allows the caller to do other tasks while the worker thread compresses in parallel. */
ZSTD_p_jobSize, </b>/* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.<b> ZSTD_p_jobSize, </b>/* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.<b>
* Each compression job is completed in parallel, so indirectly controls the nb of active threads. * Each compression job is completed in parallel, so indirectly controls the nb of active threads.
* 0 means default, which is dynamically determined based on compression parameters. * 0 means default, which is dynamically determined based on compression parameters.
@ -823,7 +818,7 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
</b>/* advanced parameters - may not remain available after API update */<b> </b>/* advanced parameters - may not remain available after API update */<b>
ZSTD_p_forceMaxWindow=1100, </b>/* Force back-reference distances to remain < windowSize,<b> ZSTD_p_forceMaxWindow=1100, </b>/* Force back-reference distances to remain < windowSize,<b>
* even when referencing into Dictionary content (default:0) */ * even when referencing into Dictionary content (default:0) */
ZSTD_p_enableLongDistanceMatching=1200, </b>/* Enable long distance matching.<b> ZSTD_p_enableLongDistanceMatching=1200, </b>/* Enable long distance matching.<b>
* This parameter is designed to improve the compression * This parameter is designed to improve the compression
* ratio for large inputs with long distance matches. * ratio for large inputs with long distance matches.
* This increases the memory usage as well as window size. * This increases the memory usage as well as window size.
@ -833,32 +828,38 @@ size_t ZSTD_decodingBufferSize_min(unsigned long long windowSize, unsigned long
* other LDM parameters. Setting the compression level * other LDM parameters. Setting the compression level
* after this parameter overrides the window log, though LDM * after this parameter overrides the window log, though LDM
* will remain enabled until explicitly disabled. */ * will remain enabled until explicitly disabled. */
ZSTD_p_ldmHashLog, </b>/* Size of the table for long distance matching, as a power of 2.<b> ZSTD_p_ldmHashLog, </b>/* Size of the table for long distance matching, as a power of 2.<b>
* Larger values increase memory usage and compression ratio, but decrease * Larger values increase memory usage and compression ratio, but decrease
* compression speed. * compression speed.
* Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX * Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX
* (default: windowlog - 7). */ * (default: windowlog - 7).
ZSTD_p_ldmMinMatch, </b>/* Minimum size of searched matches for long distance matcher.<b> * Special: value 0 means "do not change ldmHashLog". */
* Larger/too small values usually decrease compression ratio. ZSTD_p_ldmMinMatch, </b>/* Minimum size of searched matches for long distance matcher.<b>
* Must be clamped between ZSTD_LDM_MINMATCH_MIN * Larger/too small values usually decrease compression ratio.
* and ZSTD_LDM_MINMATCH_MAX (default: 64). */ * Must be clamped between ZSTD_LDM_MINMATCH_MIN
ZSTD_p_ldmBucketSizeLog, </b>/* Log size of each bucket in the LDM hash table for collision resolution.<b> * and ZSTD_LDM_MINMATCH_MAX (default: 64).
* Larger values usually improve collision resolution but may decrease * Special: value 0 means "do not change ldmMinMatch". */
* compression speed. ZSTD_p_ldmBucketSizeLog, </b>/* Log size of each bucket in the LDM hash table for collision resolution.<b>
* The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3). */ * Larger values usually improve collision resolution but may decrease
* compression speed.
* The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3).
* note : 0 is a valid value */
ZSTD_p_ldmHashEveryLog, </b>/* Frequency of inserting/looking up entries in the LDM hash table.<b> ZSTD_p_ldmHashEveryLog, </b>/* Frequency of inserting/looking up entries in the LDM hash table.<b>
* The default is MAX(0, (windowLog - ldmHashLog)) to * The default is MAX(0, (windowLog - ldmHashLog)) to
* optimize hash table usage. * optimize hash table usage.
* Larger values improve compression speed. Deviating far from the * Larger values improve compression speed. Deviating far from the
* default value will likely result in a decrease in compression ratio. * default value will likely result in a decrease in compression ratio.
* Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN. */ * Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN.
* note : 0 is a valid value */
} ZSTD_cParameter; } ZSTD_cParameter;
</b></pre><BR> </b></pre><BR>
<pre><b>size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value); <pre><b>size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value);
</b><p> Set one compression parameter, selected by enum ZSTD_cParameter. </b><p> Set one compression parameter, selected by enum ZSTD_cParameter.
Setting a parameter is generally only possible during frame initialization (before starting compression),
except for a few exceptions which can be updated during compression: compressionLevel, hashLog, chainLog, searchLog, minMatch, targetLength and strategy.
Note : when `value` is an enum, cast it to unsigned for proper type checking. Note : when `value` is an enum, cast it to unsigned for proper type checking.
@result : informational value (typically, the one being set, possibly corrected), @result : informational value (typically, value being set clamped correctly),
or an error code (which can be tested with ZSTD_isError()). or an error code (which can be tested with ZSTD_isError()).
</p></pre><BR> </p></pre><BR>
@ -1000,7 +1001,7 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t
</p></pre><BR> </p></pre><BR>
<pre><b>size_t ZSTD_resetCCtxParams(ZSTD_CCtx_params* params); <pre><b>size_t ZSTD_resetCCtxParams(ZSTD_CCtx_params* params);
</b><p> Reset params to default, with the default compression level. </b><p> Reset params to default values.
</p></pre><BR> </p></pre><BR>
@ -1028,9 +1029,10 @@ size_t ZSTD_CCtx_refPrefix_advanced(ZSTD_CCtx* cctx, const void* prefix, size_t
<pre><b>size_t ZSTD_CCtx_setParametersUsingCCtxParams( <pre><b>size_t ZSTD_CCtx_setParametersUsingCCtxParams(
ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params); ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params);
</b><p> Apply a set of ZSTD_CCtx_params to the compression context. </b><p> Apply a set of ZSTD_CCtx_params to the compression context.
This must be done before the dictionary is loaded. This can be done even after compression is started,
The pledgedSrcSize is treated as unknown. if nbWorkers==0, this will have no impact until a new compression is started.
Multithreading parameters are applied only if nbThreads > 1. if nbWorkers>=1, new parameters will be picked up at next job,
with a few restrictions (windowLog, pledgedSrcSize, nbWorkers, jobSize, and overlapLog are not updated).
</p></pre><BR> </p></pre><BR>

View File

@ -140,8 +140,6 @@ size_t ZSTD_sizeof_CStream(const ZSTD_CStream* zcs)
/* private API call, for dictBuilder only */ /* private API call, for dictBuilder only */
const seqStore_t* ZSTD_getSeqStore(const ZSTD_CCtx* ctx) { return &(ctx->seqStore); } const seqStore_t* ZSTD_getSeqStore(const ZSTD_CCtx* ctx) { return &(ctx->seqStore); }
#define ZSTD_CLEVEL_CUSTOM 999
static ZSTD_compressionParameters ZSTD_getCParamsFromCCtxParams( static ZSTD_compressionParameters ZSTD_getCParamsFromCCtxParams(
ZSTD_CCtx_params CCtxParams, U64 srcSizeHint, size_t dictSize) ZSTD_CCtx_params CCtxParams, U64 srcSizeHint, size_t dictSize)
{ {
@ -160,13 +158,6 @@ static void ZSTD_cLevelToCCtxParams_srcSize(ZSTD_CCtx_params* CCtxParams, U64 sr
CCtxParams->compressionLevel = ZSTD_CLEVEL_CUSTOM; CCtxParams->compressionLevel = ZSTD_CLEVEL_CUSTOM;
} }
static void ZSTD_cLevelToCParams(ZSTD_CCtx* cctx)
{
DEBUGLOG(4, "ZSTD_cLevelToCParams: level=%i", cctx->requestedParams.compressionLevel);
ZSTD_cLevelToCCtxParams_srcSize(
&cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1);
}
static void ZSTD_cLevelToCCtxParams(ZSTD_CCtx_params* CCtxParams) static void ZSTD_cLevelToCCtxParams(ZSTD_CCtx_params* CCtxParams)
{ {
DEBUGLOG(4, "ZSTD_cLevelToCCtxParams"); DEBUGLOG(4, "ZSTD_cLevelToCCtxParams");
@ -246,10 +237,48 @@ static ZSTD_CCtx_params ZSTD_assignParamsToCCtxParams(
return ERROR(parameter_outOfBound); \ return ERROR(parameter_outOfBound); \
} } } }
static int ZSTD_isUpdateAuthorized(ZSTD_cParameter param)
{
switch(param)
{
case ZSTD_p_compressionLevel:
case ZSTD_p_hashLog:
case ZSTD_p_chainLog:
case ZSTD_p_searchLog:
case ZSTD_p_minMatch:
case ZSTD_p_targetLength:
case ZSTD_p_compressionStrategy:
return 1;
case ZSTD_p_format:
case ZSTD_p_windowLog:
case ZSTD_p_contentSizeFlag:
case ZSTD_p_checksumFlag:
case ZSTD_p_dictIDFlag:
case ZSTD_p_forceMaxWindow :
case ZSTD_p_nbWorkers:
case ZSTD_p_jobSize:
case ZSTD_p_overlapSizeLog:
case ZSTD_p_enableLongDistanceMatching:
case ZSTD_p_ldmHashLog:
case ZSTD_p_ldmMinMatch:
case ZSTD_p_ldmBucketSizeLog:
case ZSTD_p_ldmHashEveryLog:
default:
return 0;
}
}
size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value) size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value)
{ {
DEBUGLOG(4, "ZSTD_CCtx_setParameter (%u, %u)", (U32)param, value); DEBUGLOG(4, "ZSTD_CCtx_setParameter (%u, %u)", (U32)param, value);
if (cctx->streamStage != zcss_init) return ERROR(stage_wrong); if (cctx->streamStage != zcss_init) {
if (ZSTD_isUpdateAuthorized(param)) {
cctx->cParamsChanged = 1;
} else {
return ERROR(stage_wrong);
} }
switch(param) switch(param)
{ {
@ -268,7 +297,9 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
case ZSTD_p_targetLength: case ZSTD_p_targetLength:
case ZSTD_p_compressionStrategy: case ZSTD_p_compressionStrategy:
if (cctx->cdict) return ERROR(stage_wrong); if (cctx->cdict) return ERROR(stage_wrong);
if (value>0) ZSTD_cLevelToCParams(cctx); /* Can optimize if srcSize is known */ if (value>0) {
ZSTD_cLevelToCCtxParams_srcSize(&cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1); /* Optimize cParams when srcSize is known */
}
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_contentSizeFlag: case ZSTD_p_contentSizeFlag:
@ -281,20 +312,20 @@ size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned v
* default : 0 when using a CDict, 1 when using a Prefix */ * default : 0 when using a CDict, 1 when using a Prefix */
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_nbThreads: case ZSTD_p_nbWorkers:
if ((value > 1) && cctx->staticSize) { if ((value>0) && cctx->staticSize) {
return ERROR(parameter_unsupported); /* MT not compatible with static alloc */ return ERROR(parameter_unsupported); /* MT not compatible with static alloc */
} }
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_nonBlockingMode:
case ZSTD_p_jobSize: case ZSTD_p_jobSize:
case ZSTD_p_overlapSizeLog: case ZSTD_p_overlapSizeLog:
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_enableLongDistanceMatching: case ZSTD_p_enableLongDistanceMatching:
if (cctx->cdict) return ERROR(stage_wrong); if (cctx->cdict) return ERROR(stage_wrong);
if (value>0) ZSTD_cLevelToCParams(cctx); if (value>0)
ZSTD_cLevelToCCtxParams_srcSize(&cctx->requestedParams, cctx->pledgedSrcSizePlusOne-1); /* Optimize cParams when srcSize is known */
return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value); return ZSTD_CCtxParam_setParameter(&cctx->requestedParams, param, value);
case ZSTD_p_ldmHashLog: case ZSTD_p_ldmHashLog:
@ -403,21 +434,12 @@ size_t ZSTD_CCtxParam_setParameter(
CCtxParams->forceWindow = (value > 0); CCtxParams->forceWindow = (value > 0);
return CCtxParams->forceWindow; return CCtxParams->forceWindow;
case ZSTD_p_nbThreads : case ZSTD_p_nbWorkers :
if (value == 0) return CCtxParams->nbThreads;
#ifndef ZSTD_MULTITHREAD #ifndef ZSTD_MULTITHREAD
if (value > 1) return ERROR(parameter_unsupported); if (value > 0) return ERROR(parameter_unsupported);
return 1; return 0;
#else #else
return ZSTDMT_CCtxParam_setNbThreads(CCtxParams, value); return ZSTDMT_CCtxParam_setNbWorkers(CCtxParams, value);
#endif
case ZSTD_p_nonBlockingMode :
#ifndef ZSTD_MULTITHREAD
return ERROR(parameter_unsupported);
#else
CCtxParams->nonBlockingMode = (value>0);
return CCtxParams->nonBlockingMode;
#endif #endif
case ZSTD_p_jobSize : case ZSTD_p_jobSize :
@ -476,6 +498,9 @@ size_t ZSTD_CCtxParam_setParameter(
/** ZSTD_CCtx_setParametersUsingCCtxParams() : /** ZSTD_CCtx_setParametersUsingCCtxParams() :
* just applies `params` into `cctx` * just applies `params` into `cctx`
* no action is performed, parameters are merely stored. * no action is performed, parameters are merely stored.
* If ZSTDMT is enabled, parameters are pushed to cctx->mtctx.
* This is possible even if a compression is ongoing.
* In which case, new parameters will be applied on the fly, starting with next compression job.
*/ */
size_t ZSTD_CCtx_setParametersUsingCCtxParams( size_t ZSTD_CCtx_setParametersUsingCCtxParams(
ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params) ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params)
@ -484,7 +509,6 @@ size_t ZSTD_CCtx_setParametersUsingCCtxParams(
if (cctx->cdict) return ERROR(stage_wrong); if (cctx->cdict) return ERROR(stage_wrong);
cctx->requestedParams = *params; cctx->requestedParams = *params;
return 0; return 0;
} }
@ -680,7 +704,7 @@ static size_t ZSTD_sizeof_matchState(ZSTD_compressionParameters const* cParams,
size_t ZSTD_estimateCCtxSize_usingCCtxParams(const ZSTD_CCtx_params* params) size_t ZSTD_estimateCCtxSize_usingCCtxParams(const ZSTD_CCtx_params* params)
{ {
/* Estimate CCtx size is supported for single-threaded compression only. */ /* Estimate CCtx size is supported for single-threaded compression only. */
if (params->nbThreads > 1) { return ERROR(GENERIC); } if (params->nbWorkers > 0) { return ERROR(GENERIC); }
{ ZSTD_compressionParameters const cParams = { ZSTD_compressionParameters const cParams =
ZSTD_getCParamsFromCCtxParams(*params, 0, 0); ZSTD_getCParamsFromCCtxParams(*params, 0, 0);
size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << cParams.windowLog); size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << cParams.windowLog);
@ -729,7 +753,7 @@ size_t ZSTD_estimateCCtxSize(int compressionLevel)
size_t ZSTD_estimateCStreamSize_usingCCtxParams(const ZSTD_CCtx_params* params) size_t ZSTD_estimateCStreamSize_usingCCtxParams(const ZSTD_CCtx_params* params)
{ {
if (params->nbThreads > 1) { return ERROR(GENERIC); } if (params->nbWorkers > 0) { return ERROR(GENERIC); }
{ size_t const CCtxSize = ZSTD_estimateCCtxSize_usingCCtxParams(params); { size_t const CCtxSize = ZSTD_estimateCCtxSize_usingCCtxParams(params);
size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << params->cParams.windowLog); size_t const blockSize = MIN(ZSTD_BLOCKSIZE_MAX, (size_t)1 << params->cParams.windowLog);
size_t const inBuffSize = ((size_t)1 << params->cParams.windowLog) + blockSize; size_t const inBuffSize = ((size_t)1 << params->cParams.windowLog) + blockSize;
@ -768,7 +792,7 @@ size_t ZSTD_estimateCStreamSize(int compressionLevel) {
ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx) ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
{ {
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if ((cctx->appliedParams.nbThreads > 1) || (cctx->appliedParams.nonBlockingMode)) { if (cctx->appliedParams.nbWorkers > 0) {
return ZSTDMT_getFrameProgression(cctx->mtctx); return ZSTDMT_getFrameProgression(cctx->mtctx);
} }
#endif #endif
@ -2513,7 +2537,8 @@ size_t ZSTD_compress_advanced_internal(
const void* dict,size_t dictSize, const void* dict,size_t dictSize,
ZSTD_CCtx_params params) ZSTD_CCtx_params params)
{ {
DEBUGLOG(4, "ZSTD_compress_advanced_internal"); DEBUGLOG(4, "ZSTD_compress_advanced_internal (srcSize:%u)",
(U32)srcSize);
CHECK_F( ZSTD_compressBegin_internal(cctx, dict, dictSize, ZSTD_dm_auto, NULL, CHECK_F( ZSTD_compressBegin_internal(cctx, dict, dictSize, ZSTD_dm_auto, NULL,
params, srcSize, ZSTDb_not_buffered) ); params, srcSize, ZSTDb_not_buffered) );
return ZSTD_compressEnd(cctx, dst, dstCapacity, src, srcSize); return ZSTD_compressEnd(cctx, dst, dstCapacity, src, srcSize);
@ -2993,7 +3018,7 @@ MEM_STATIC size_t ZSTD_limitCopy(void* dst, size_t dstCapacity,
/** ZSTD_compressStream_generic(): /** ZSTD_compressStream_generic():
* internal function for all *compressStream*() variants and *compress_generic() * internal function for all *compressStream*() variants and *compress_generic()
* non-static, because can be called from zstdmt.c * non-static, because can be called from zstdmt_compress.c
* @return : hint size for next input */ * @return : hint size for next input */
size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs, size_t ZSTD_compressStream_generic(ZSTD_CStream* zcs,
ZSTD_outBuffer* output, ZSTD_outBuffer* output,
@ -3172,28 +3197,28 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) { if ((cctx->pledgedSrcSizePlusOne-1) <= ZSTDMT_JOBSIZE_MIN) {
params.nbThreads = 1; /* do not invoke multi-threading when src size is too small */ params.nbWorkers = 0; /* do not invoke multi-threading when src size is too small */
params.nonBlockingMode = 0;
} }
if ((params.nbThreads > 1) | (params.nonBlockingMode == 1)) { if (params.nbWorkers > 0) {
if (cctx->mtctx == NULL || (params.nbThreads != ZSTDMT_getNbThreads(cctx->mtctx))) { /* mt context creation */
DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbThreads=%u", if (cctx->mtctx == NULL || (params.nbWorkers != ZSTDMT_getNbWorkers(cctx->mtctx))) {
params.nbThreads); DEBUGLOG(4, "ZSTD_compress_generic: creating new mtctx for nbWorkers=%u",
params.nbWorkers);
if (cctx->mtctx != NULL) if (cctx->mtctx != NULL)
DEBUGLOG(4, "ZSTD_compress_generic: previous nbThreads was %u", DEBUGLOG(4, "ZSTD_compress_generic: previous nbWorkers was %u",
ZSTDMT_getNbThreads(cctx->mtctx)); ZSTDMT_getNbWorkers(cctx->mtctx));
ZSTDMT_freeCCtx(cctx->mtctx); ZSTDMT_freeCCtx(cctx->mtctx);
cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbThreads, cctx->customMem); cctx->mtctx = ZSTDMT_createCCtx_advanced(params.nbWorkers, cctx->customMem);
if (cctx->mtctx == NULL) return ERROR(memory_allocation); if (cctx->mtctx == NULL) return ERROR(memory_allocation);
} }
DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbThreads=%u", params.nbThreads); /* mt compression */
DEBUGLOG(4, "call ZSTDMT_initCStream_internal as nbWorkers=%u", params.nbWorkers);
CHECK_F( ZSTDMT_initCStream_internal( CHECK_F( ZSTDMT_initCStream_internal(
cctx->mtctx, cctx->mtctx,
prefixDict.dict, prefixDict.dictSize, ZSTD_dm_rawContent, prefixDict.dict, prefixDict.dictSize, ZSTD_dm_rawContent,
cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) ); cctx->cdict, params, cctx->pledgedSrcSizePlusOne-1) );
cctx->streamStage = zcss_load; cctx->streamStage = zcss_load;
cctx->appliedParams.nbThreads = params.nbThreads; cctx->appliedParams.nbWorkers = params.nbWorkers;
cctx->appliedParams.nonBlockingMode = params.nonBlockingMode;
} else } else
#endif #endif
{ CHECK_F( ZSTD_resetCStream_internal( { CHECK_F( ZSTD_resetCStream_internal(
@ -3201,19 +3226,23 @@ size_t ZSTD_compress_generic (ZSTD_CCtx* cctx,
prefixDict.dictMode, cctx->cdict, params, prefixDict.dictMode, cctx->cdict, params,
cctx->pledgedSrcSizePlusOne-1) ); cctx->pledgedSrcSizePlusOne-1) );
assert(cctx->streamStage == zcss_load); assert(cctx->streamStage == zcss_load);
assert(cctx->appliedParams.nbThreads <= 1); assert(cctx->appliedParams.nbWorkers == 0);
} } } }
/* compression stage */ /* compression stage */
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
if ((cctx->appliedParams.nbThreads > 1) | (cctx->appliedParams.nonBlockingMode==1)) { if (cctx->appliedParams.nbWorkers > 0) {
size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp); if (cctx->cParamsChanged) {
if ( ZSTD_isError(flushMin) ZSTDMT_updateCParams_whileCompressing(cctx->mtctx, cctx->requestedParams.compressionLevel, cctx->requestedParams.cParams);
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */ cctx->cParamsChanged = 0;
ZSTD_startNewCompression(cctx);
} }
return flushMin; { size_t const flushMin = ZSTDMT_compressStream_generic(cctx->mtctx, output, input, endOp);
} if ( ZSTD_isError(flushMin)
|| (endOp == ZSTD_e_end && flushMin == 0) ) { /* compression completed */
ZSTD_startNewCompression(cctx);
}
return flushMin;
} }
#endif #endif
CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) ); CHECK_F( ZSTD_compressStream_generic(cctx, output, input, endOp) );
DEBUGLOG(5, "completed ZSTD_compress_generic"); DEBUGLOG(5, "completed ZSTD_compress_generic");
@ -3239,7 +3268,7 @@ size_t ZSTD_compress_generic_simpleArgs (
/*====== Finalize ======*/ /*====== Finalize ======*/
/*! ZSTD_flushStream() : /*! ZSTD_flushStream() :
* @return : amount of data remaining to flush */ * @return : amount of data remaining to flush */
size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output) size_t ZSTD_flushStream(ZSTD_CStream* zcs, ZSTD_outBuffer* output)
{ {
ZSTD_inBuffer input = { NULL, 0, 0 }; ZSTD_inBuffer input = { NULL, 0, 0 };

View File

@ -30,8 +30,9 @@ extern "C" {
/*-************************************* /*-*************************************
* Constants * Constants
***************************************/ ***************************************/
static const U32 g_searchStrength = 8; #define kSearchStrength 8
#define HASH_READ_SIZE 8 #define HASH_READ_SIZE 8
#define ZSTD_CLEVEL_CUSTOM 999
#define ZSTD_DUBT_UNSORTED_MARK 1 /* For btlazy2 strategy, index 1 now means "unsorted". #define ZSTD_DUBT_UNSORTED_MARK 1 /* For btlazy2 strategy, index 1 now means "unsorted".
It could be confused for a real successor at index "1", if sorted as larger than its predecessor. It could be confused for a real successor at index "1", if sorted as larger than its predecessor.
It's not a big deal though : candidate will just be sorted again. It's not a big deal though : candidate will just be sorted again.
@ -151,12 +152,11 @@ struct ZSTD_CCtx_params_s {
ZSTD_frameParameters fParams; ZSTD_frameParameters fParams;
int compressionLevel; int compressionLevel;
U32 forceWindow; /* force back-references to respect limit of int forceWindow; /* force back-references to respect limit of
* 1<<wLog, even for dictionary */ * 1<<wLog, even for dictionary */
/* Multithreading: used to pass parameters to mtctx */ /* Multithreading: used to pass parameters to mtctx */
U32 nbThreads; unsigned nbWorkers;
int nonBlockingMode; /* will trigger ZSTDMT even with nbThreads==1 */
unsigned jobSize; unsigned jobSize;
unsigned overlapSizeLog; unsigned overlapSizeLog;
@ -165,14 +165,14 @@ struct ZSTD_CCtx_params_s {
/* For use with createCCtxParams() and freeCCtxParams() only */ /* For use with createCCtxParams() and freeCCtxParams() only */
ZSTD_customMem customMem; ZSTD_customMem customMem;
}; /* typedef'd to ZSTD_CCtx_params within "zstd.h" */ }; /* typedef'd to ZSTD_CCtx_params within "zstd.h" */
struct ZSTD_CCtx_s { struct ZSTD_CCtx_s {
ZSTD_compressionStage_e stage; ZSTD_compressionStage_e stage;
U32 dictID; int cParamsChanged; /* == 1 if cParams(except wlog) or compression level are changed in requestedParams. Triggers transmission of new params to ZSTDMT (if available) then reset to 0. */
ZSTD_CCtx_params requestedParams; ZSTD_CCtx_params requestedParams;
ZSTD_CCtx_params appliedParams; ZSTD_CCtx_params appliedParams;
U32 dictID;
void* workSpace; void* workSpace;
size_t workSpaceSize; size_t workSpaceSize;
size_t blockSize; size_t blockSize;

View File

@ -113,7 +113,7 @@ size_t ZSTD_compressBlock_doubleFast_generic(
while (((ip>anchor) & (match>lowest)) && (ip[-1] == match[-1])) { ip--; match--; mLength++; } /* catch up */ while (((ip>anchor) & (match>lowest)) && (ip[-1] == match[-1])) { ip--; match--; mLength++; } /* catch up */
} }
} else { } else {
ip += ((ip-anchor) >> g_searchStrength) + 1; ip += ((ip-anchor) >> kSearchStrength) + 1;
continue; continue;
} }
@ -264,7 +264,7 @@ static size_t ZSTD_compressBlock_doubleFast_extDict_generic(
ZSTD_storeSeq(seqStore, ip-anchor, anchor, offset + ZSTD_REP_MOVE, mLength-MINMATCH); ZSTD_storeSeq(seqStore, ip-anchor, anchor, offset + ZSTD_REP_MOVE, mLength-MINMATCH);
} else { } else {
ip += ((ip-anchor) >> g_searchStrength) + 1; ip += ((ip-anchor) >> kSearchStrength) + 1;
continue; continue;
} } } }

View File

@ -79,7 +79,7 @@ size_t ZSTD_compressBlock_fast_generic(
} else { } else {
U32 offset; U32 offset;
if ( (matchIndex <= lowestIndex) || (MEM_read32(match) != MEM_read32(ip)) ) { if ( (matchIndex <= lowestIndex) || (MEM_read32(match) != MEM_read32(ip)) ) {
ip += ((ip-anchor) >> g_searchStrength) + 1; ip += ((ip-anchor) >> kSearchStrength) + 1;
continue; continue;
} }
mLength = ZSTD_count(ip+4, match+4, iend) + 4; mLength = ZSTD_count(ip+4, match+4, iend) + 4;
@ -185,7 +185,7 @@ static size_t ZSTD_compressBlock_fast_extDict_generic(
} else { } else {
if ( (matchIndex < lowestIndex) || if ( (matchIndex < lowestIndex) ||
(MEM_read32(match) != MEM_read32(ip)) ) { (MEM_read32(match) != MEM_read32(ip)) ) {
ip += ((ip-anchor) >> g_searchStrength) + 1; ip += ((ip-anchor) >> kSearchStrength) + 1;
continue; continue;
} }
{ const BYTE* matchEnd = matchIndex < dictLimit ? dictEnd : iend; { const BYTE* matchEnd = matchIndex < dictLimit ? dictEnd : iend;

View File

@ -508,7 +508,7 @@ size_t ZSTD_compressBlock_lazy_generic(
} }
if (matchLength < 4) { if (matchLength < 4) {
ip += ((ip-anchor) >> g_searchStrength) + 1; /* jump faster over incompressible sections */ ip += ((ip-anchor) >> kSearchStrength) + 1; /* jump faster over incompressible sections */
continue; continue;
} }
@ -681,7 +681,7 @@ size_t ZSTD_compressBlock_lazy_extDict_generic(
} }
if (matchLength < 4) { if (matchLength < 4) {
ip += ((ip-anchor) >> g_searchStrength) + 1; /* jump faster over incompressible sections */ ip += ((ip-anchor) >> kSearchStrength) + 1; /* jump faster over incompressible sections */
continue; continue;
} }

View File

@ -10,7 +10,7 @@
/* ====== Tuning parameters ====== */ /* ====== Tuning parameters ====== */
#define ZSTDMT_NBTHREADS_MAX 200 #define ZSTDMT_NBWORKERS_MAX 200
#define ZSTDMT_JOBSIZE_MAX (MEM_32bits() ? (512 MB) : (2 GB)) /* note : limited by `jobSize` type, which is `unsigned` */ #define ZSTDMT_JOBSIZE_MAX (MEM_32bits() ? (512 MB) : (2 GB)) /* note : limited by `jobSize` type, which is `unsigned` */
#define ZSTDMT_OVERLAPLOG_DEFAULT 6 #define ZSTDMT_OVERLAPLOG_DEFAULT 6
@ -97,9 +97,9 @@ typedef struct ZSTDMT_bufferPool_s {
buffer_t bTable[1]; /* variable size */ buffer_t bTable[1]; /* variable size */
} ZSTDMT_bufferPool; } ZSTDMT_bufferPool;
static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_customMem cMem) static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbWorkers, ZSTD_customMem cMem)
{ {
unsigned const maxNbBuffers = 2*nbThreads + 3; unsigned const maxNbBuffers = 2*nbWorkers + 3;
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc( ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc(
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem); sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
if (bufPool==NULL) return NULL; if (bufPool==NULL) return NULL;
@ -236,23 +236,24 @@ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
} }
/* ZSTDMT_createCCtxPool() : /* ZSTDMT_createCCtxPool() :
* implies nbThreads >= 1 , checked by caller ZSTDMT_createCCtx() */ * implies nbWorkers >= 1 , checked by caller ZSTDMT_createCCtx() */
static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads, static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbWorkers,
ZSTD_customMem cMem) ZSTD_customMem cMem)
{ {
ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc( ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc(
sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem); sizeof(ZSTDMT_CCtxPool) + (nbWorkers-1)*sizeof(ZSTD_CCtx*), cMem);
assert(nbWorkers > 0);
if (!cctxPool) return NULL; if (!cctxPool) return NULL;
if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) { if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) {
ZSTD_free(cctxPool, cMem); ZSTD_free(cctxPool, cMem);
return NULL; return NULL;
} }
cctxPool->cMem = cMem; cctxPool->cMem = cMem;
cctxPool->totalCCtx = nbThreads; cctxPool->totalCCtx = nbWorkers;
cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */ cctxPool->availCCtx = 1; /* at least one cctx for single-thread mode */
cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem); cctxPool->cctx[0] = ZSTD_createCCtx_advanced(cMem);
if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; } if (!cctxPool->cctx[0]) { ZSTDMT_freeCCtxPool(cctxPool); return NULL; }
DEBUGLOG(3, "cctxPool created, with %u threads", nbThreads); DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers);
return cctxPool; return cctxPool;
} }
@ -260,15 +261,16 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool) static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
{ {
ZSTD_pthread_mutex_lock(&cctxPool->poolMutex); ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
{ unsigned const nbThreads = cctxPool->totalCCtx; { unsigned const nbWorkers = cctxPool->totalCCtx;
size_t const poolSize = sizeof(*cctxPool) size_t const poolSize = sizeof(*cctxPool)
+ (nbThreads-1)*sizeof(ZSTD_CCtx*); + (nbWorkers-1) * sizeof(ZSTD_CCtx*);
unsigned u; unsigned u;
size_t totalCCtxSize = 0; size_t totalCCtxSize = 0;
for (u=0; u<nbThreads; u++) { for (u=0; u<nbWorkers; u++) {
totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]); totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]);
} }
ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex); ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex);
assert(nbWorkers > 0);
return poolSize + totalCCtxSize; return poolSize + totalCCtxSize;
} }
} }
@ -295,8 +297,8 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
if (pool->availCCtx < pool->totalCCtx) if (pool->availCCtx < pool->totalCCtx)
pool->cctx[pool->availCCtx++] = cctx; pool->cctx[pool->availCCtx++] = cctx;
else { else {
/* pool overflow : should not happen, since totalCCtx==nbThreads */ /* pool overflow : should not happen, since totalCCtx==nbWorkers */
DEBUGLOG(5, "CCtx pool overflow : free cctx"); DEBUGLOG(4, "CCtx pool overflow : free cctx");
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);
} }
ZSTD_pthread_mutex_unlock(&pool->poolMutex); ZSTD_pthread_mutex_unlock(&pool->poolMutex);
@ -502,52 +504,52 @@ static ZSTDMT_jobDescription* ZSTDMT_createJobsTable(U32* nbJobsPtr, ZSTD_custom
return jobTable; return jobTable;
} }
/* ZSTDMT_CCtxParam_setNbThreads(): /* ZSTDMT_CCtxParam_setNbWorkers():
* Internal use only */ * Internal use only */
size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads) size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers)
{ {
if (nbThreads > ZSTDMT_NBTHREADS_MAX) nbThreads = ZSTDMT_NBTHREADS_MAX; if (nbWorkers > ZSTDMT_NBWORKERS_MAX) nbWorkers = ZSTDMT_NBWORKERS_MAX;
if (nbThreads < 1) nbThreads = 1; if (nbWorkers < 1) nbWorkers = 1;
params->nbThreads = nbThreads; params->nbWorkers = nbWorkers;
params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT; params->overlapSizeLog = ZSTDMT_OVERLAPLOG_DEFAULT;
params->jobSize = 0; params->jobSize = 0;
return nbThreads; return nbWorkers;
} }
ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem) ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers, ZSTD_customMem cMem)
{ {
ZSTDMT_CCtx* mtctx; ZSTDMT_CCtx* mtctx;
U32 nbJobs = nbThreads + 2; U32 nbJobs = nbWorkers + 2;
DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbThreads = %u)", nbThreads); DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbWorkers = %u)", nbWorkers);
if (nbThreads < 1) return NULL; if (nbWorkers < 1) return NULL;
nbThreads = MIN(nbThreads , ZSTDMT_NBTHREADS_MAX); nbWorkers = MIN(nbWorkers , ZSTDMT_NBWORKERS_MAX);
if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL)) if ((cMem.customAlloc!=NULL) ^ (cMem.customFree!=NULL))
/* invalid custom allocator */ /* invalid custom allocator */
return NULL; return NULL;
mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem); mtctx = (ZSTDMT_CCtx*) ZSTD_calloc(sizeof(ZSTDMT_CCtx), cMem);
if (!mtctx) return NULL; if (!mtctx) return NULL;
ZSTDMT_CCtxParam_setNbThreads(&mtctx->params, nbThreads); ZSTDMT_CCtxParam_setNbWorkers(&mtctx->params, nbWorkers);
mtctx->cMem = cMem; mtctx->cMem = cMem;
mtctx->allJobsCompleted = 1; mtctx->allJobsCompleted = 1;
mtctx->factory = POOL_create_advanced(nbThreads, 0, cMem); mtctx->factory = POOL_create_advanced(nbWorkers, 0, cMem);
mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem); mtctx->jobs = ZSTDMT_createJobsTable(&nbJobs, cMem);
assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */ assert(nbJobs > 0); assert((nbJobs & (nbJobs - 1)) == 0); /* ensure nbJobs is a power of 2 */
mtctx->jobIDMask = nbJobs - 1; mtctx->jobIDMask = nbJobs - 1;
mtctx->bufPool = ZSTDMT_createBufferPool(nbThreads, cMem); mtctx->bufPool = ZSTDMT_createBufferPool(nbWorkers, cMem);
mtctx->cctxPool = ZSTDMT_createCCtxPool(nbThreads, cMem); mtctx->cctxPool = ZSTDMT_createCCtxPool(nbWorkers, cMem);
if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) { if (!mtctx->factory | !mtctx->jobs | !mtctx->bufPool | !mtctx->cctxPool) {
ZSTDMT_freeCCtx(mtctx); ZSTDMT_freeCCtx(mtctx);
return NULL; return NULL;
} }
DEBUGLOG(3, "mt_cctx created, for %u threads", nbThreads); DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers);
return mtctx; return mtctx;
} }
ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads) ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers)
{ {
return ZSTDMT_createCCtx_advanced(nbThreads, ZSTD_defaultCMem); return ZSTDMT_createCCtx_advanced(nbWorkers, ZSTD_defaultCMem);
} }
@ -649,8 +651,8 @@ size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSTDMT_parameter parameter,
} }
} }
/* Sets parameters relevant to the compression job, initializing others to /* Sets parameters relevant to the compression job,
* default values. Notably, nbThreads should probably be zero. */ * initializing others to default values. */
static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params) static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
{ {
ZSTD_CCtx_params jobParams; ZSTD_CCtx_params jobParams;
@ -664,13 +666,29 @@ static ZSTD_CCtx_params ZSTDMT_initJobCCtxParams(ZSTD_CCtx_params const params)
return jobParams; return jobParams;
} }
/* ZSTDMT_getNbThreads(): /*! ZSTDMT_updateCParams_whileCompressing() :
* Update compression level and parameters (except wlog)
* while compression is ongoing.
* New parameters will be applied to next compression job. */
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams)
{
U32 const saved_wlog = mtctx->params.cParams.windowLog; /* Do not modify windowLog while compressing */
DEBUGLOG(5, "ZSTDMT_updateCParams_whileCompressing (level:%i)",
compressionLevel);
mtctx->params.compressionLevel = compressionLevel;
if (compressionLevel != ZSTD_CLEVEL_CUSTOM)
cParams = ZSTD_getCParams(compressionLevel, mtctx->frameContentSize, 0 /* dictSize */ );
cParams.windowLog = saved_wlog;
mtctx->params.cParams = cParams;
}
/* ZSTDMT_getNbWorkers():
* @return nb threads currently active in mtctx. * @return nb threads currently active in mtctx.
* mtctx must be valid */ * mtctx must be valid */
unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx) unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx)
{ {
assert(mtctx != NULL); assert(mtctx != NULL);
return mtctx->params.nbThreads; return mtctx->params.nbWorkers;
} }
/* ZSTDMT_getFrameProgression(): /* ZSTDMT_getFrameProgression():
@ -709,15 +727,15 @@ ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
/* ===== Multi-threaded compression ===== */ /* ===== Multi-threaded compression ===== */
/* ------------------------------------------ */ /* ------------------------------------------ */
static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbThreads) { static unsigned ZSTDMT_computeNbJobs(size_t srcSize, unsigned windowLog, unsigned nbWorkers) {
assert(nbThreads>0); assert(nbWorkers>0);
{ size_t const jobSizeTarget = (size_t)1 << (windowLog + 2); { size_t const jobSizeTarget = (size_t)1 << (windowLog + 2);
size_t const jobMaxSize = jobSizeTarget << 2; size_t const jobMaxSize = jobSizeTarget << 2;
size_t const passSizeMax = jobMaxSize * nbThreads; size_t const passSizeMax = jobMaxSize * nbWorkers;
unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1; unsigned const multiplier = (unsigned)(srcSize / passSizeMax) + 1;
unsigned const nbJobsLarge = multiplier * nbThreads; unsigned const nbJobsLarge = multiplier * nbWorkers;
unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1; unsigned const nbJobsMax = (unsigned)(srcSize / jobSizeTarget) + 1;
unsigned const nbJobsSmall = MIN(nbJobsMax, nbThreads); unsigned const nbJobsSmall = MIN(nbJobsMax, nbWorkers);
return (multiplier>1) ? nbJobsLarge : nbJobsSmall; return (multiplier>1) ? nbJobsLarge : nbJobsSmall;
} } } }
@ -734,7 +752,7 @@ static size_t ZSTDMT_compress_advanced_internal(
ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params); ZSTD_CCtx_params const jobParams = ZSTDMT_initJobCCtxParams(params);
unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog; unsigned const overlapRLog = (params.overlapSizeLog>9) ? 0 : 9-params.overlapSizeLog;
size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog); size_t const overlapSize = (overlapRLog>=9) ? 0 : (size_t)1 << (params.cParams.windowLog - overlapRLog);
unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbThreads); unsigned const nbJobs = ZSTDMT_computeNbJobs(srcSize, params.cParams.windowLog, params.nbWorkers);
size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs; size_t const proposedJobSize = (srcSize + (nbJobs-1)) / nbJobs;
size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */ size_t const avgJobSize = (((proposedJobSize-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize + 0xFFFF : proposedJobSize; /* avoid too small last block */
const char* const srcStart = (const char*)src; const char* const srcStart = (const char*)src;
@ -742,13 +760,13 @@ static size_t ZSTDMT_compress_advanced_internal(
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */ unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbJobs : (unsigned)(dstCapacity / ZSTD_compressBound(avgJobSize)); /* presumes avgJobSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0; size_t frameStartPos = 0, dstBufferPos = 0;
XXH64_state_t xxh64; XXH64_state_t xxh64;
assert(jobParams.nbThreads == 0); assert(jobParams.nbWorkers == 0);
assert(mtctx->cctxPool->totalCCtx == params.nbThreads); assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ", DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
nbJobs, (U32)proposedJobSize, (U32)avgJobSize); nbJobs, (U32)proposedJobSize, (U32)avgJobSize);
if ((nbJobs==1) | (params.nbThreads<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */ if ((nbJobs==1) | (params.nbWorkers<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */
ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0]; ZSTD_CCtx* const cctx = mtctx->cctxPool->cctx[0];
if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams); if (cdict) return ZSTD_compress_usingCDict_advanced(cctx, dst, dstCapacity, src, srcSize, cdict, jobParams.fParams);
return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams); return ZSTD_compress_advanced_internal(cctx, dst, dstCapacity, src, srcSize, NULL, 0, jobParams);
@ -856,7 +874,7 @@ size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity, void* dst, size_t dstCapacity,
const void* src, size_t srcSize, const void* src, size_t srcSize,
const ZSTD_CDict* cdict, const ZSTD_CDict* cdict,
ZSTD_parameters const params, ZSTD_parameters params,
unsigned overlapLog) unsigned overlapLog)
{ {
ZSTD_CCtx_params cctxParams = mtctx->params; ZSTD_CCtx_params cctxParams = mtctx->params;
@ -892,12 +910,14 @@ size_t ZSTDMT_initCStream_internal(
const ZSTD_CDict* cdict, ZSTD_CCtx_params params, const ZSTD_CDict* cdict, ZSTD_CCtx_params params,
unsigned long long pledgedSrcSize) unsigned long long pledgedSrcSize)
{ {
DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u)", (U32)pledgedSrcSize); DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)",
(U32)pledgedSrcSize, params.nbWorkers, mtctx->cctxPool->totalCCtx);
/* params are supposed to be fully validated at this point */ /* params are supposed to be fully validated at this point */
assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams))); assert(!ZSTD_isError(ZSTD_checkCParams(params.cParams)));
assert(!((dict) && (cdict))); /* either dict or cdict, not both */ assert(!((dict) && (cdict))); /* either dict or cdict, not both */
assert(mtctx->cctxPool->totalCCtx == params.nbThreads); assert(mtctx->cctxPool->totalCCtx == params.nbWorkers);
mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
/* init */
if (params.jobSize == 0) { if (params.jobSize == 0) {
if (params.cParams.windowLog >= 29) if (params.cParams.windowLog >= 29)
params.jobSize = ZSTDMT_JOBSIZE_MAX; params.jobSize = ZSTDMT_JOBSIZE_MAX;
@ -906,15 +926,17 @@ size_t ZSTDMT_initCStream_internal(
} }
if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX; if (params.jobSize > ZSTDMT_JOBSIZE_MAX) params.jobSize = ZSTDMT_JOBSIZE_MAX;
mtctx->singleBlockingThread = (pledgedSrcSize <= ZSTDMT_JOBSIZE_MIN); /* do not trigger multi-threading when srcSize is too small */
if (mtctx->singleBlockingThread) { if (mtctx->singleBlockingThread) {
ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params); ZSTD_CCtx_params const singleThreadParams = ZSTDMT_initJobCCtxParams(params);
DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode"); DEBUGLOG(4, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
assert(singleThreadParams.nbThreads == 0); assert(singleThreadParams.nbWorkers == 0);
return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0], return ZSTD_initCStream_internal(mtctx->cctxPool->cctx[0],
dict, dictSize, cdict, dict, dictSize, cdict,
singleThreadParams, pledgedSrcSize); singleThreadParams, pledgedSrcSize);
} }
DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u threads", params.nbThreads);
DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params.nbWorkers);
if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */ if (mtctx->allJobsCompleted == 0) { /* previous compression not correctly finished */
ZSTDMT_waitForAllJobsCompleted(mtctx); ZSTDMT_waitForAllJobsCompleted(mtctx);
@ -993,8 +1015,6 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize) size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize)
{ {
if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; if (!pledgedSrcSize) pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN;
if (mtctx->params.nbThreads==1)
return ZSTD_resetCStream(mtctx->cctxPool->cctx[0], pledgedSrcSize);
return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params, return ZSTDMT_initCStream_internal(mtctx, NULL, 0, ZSTD_dm_auto, 0, mtctx->params,
pledgedSrcSize); pledgedSrcSize);
} }

View File

@ -30,15 +30,15 @@
/* === Memory management === */ /* === Memory management === */
typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx; typedef struct ZSTDMT_CCtx_s ZSTDMT_CCtx;
ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbThreads); ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx(unsigned nbWorkers);
ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTDLIB_API ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbWorkers,
ZSTD_customMem cMem); ZSTD_customMem cMem);
ZSTDLIB_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx); ZSTDLIB_API size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx);
ZSTDLIB_API size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx); ZSTDLIB_API size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx* mtctx);
/* === Simple buffer-to-butter one-pass function === */ /* === Simple one-pass compression function === */
ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx, ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity, void* dst, size_t dstCapacity,
@ -50,7 +50,7 @@ ZSTDLIB_API size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* mtctx,
/* === Streaming functions === */ /* === Streaming functions === */
ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel); ZSTDLIB_API size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel);
ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< if srcSize is not known at reset time, use ZSTD_CONTENTSIZE_UNKNOWN. Note: for compatibility with older programs, 0 means the same as ZSTD_CONTENTSIZE_UNKNOWN, but it may change in the future, to mean "empty" */ ZSTDLIB_API size_t ZSTDMT_resetCStream(ZSTDMT_CCtx* mtctx, unsigned long long pledgedSrcSize); /**< if srcSize is not known at reset time, use ZSTD_CONTENTSIZE_UNKNOWN. Note: for compatibility with older programs, 0 means the same as ZSTD_CONTENTSIZE_UNKNOWN, but it will change in the future to mean "empty" */
ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input); ZSTDLIB_API size_t ZSTDMT_compressStream(ZSTDMT_CCtx* mtctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
@ -68,7 +68,7 @@ ZSTDLIB_API size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx* mtctx,
void* dst, size_t dstCapacity, void* dst, size_t dstCapacity,
const void* src, size_t srcSize, const void* src, size_t srcSize,
const ZSTD_CDict* cdict, const ZSTD_CDict* cdict,
ZSTD_parameters const params, ZSTD_parameters params,
unsigned overlapLog); unsigned overlapLog);
ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx, ZSTDLIB_API size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* mtctx,
@ -109,19 +109,28 @@ ZSTDLIB_API size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx* mtctx,
ZSTD_EndDirective endOp); ZSTD_EndDirective endOp);
/* === Private definitions; never ever use directly === */ /* ========================================================
* === Private interface, for use by ZSTD_compress.c ===
* === Not exposed in libzstd. Never invoke directly ===
* ======================================================== */
size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value); size_t ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params* params, ZSTDMT_parameter parameter, unsigned value);
/* ZSTDMT_CCtxParam_setNbThreads() /* ZSTDMT_CCtxParam_setNbWorkers()
* Set nbThreads, and clamp it correctly, * Set nbWorkers, and clamp it.
* also reset jobSize and overlapLog */ * Also reset jobSize and overlapLog */
size_t ZSTDMT_CCtxParam_setNbThreads(ZSTD_CCtx_params* params, unsigned nbThreads); size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params* params, unsigned nbWorkers);
/* ZSTDMT_getNbThreads(): /*! ZSTDMT_updateCParams_whileCompressing() :
* Update compression level and parameters (except wlog)
* while compression is ongoing.
* New parameters will be applied to next compression job. */
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, int compressionLevel, ZSTD_compressionParameters cParams);
/* ZSTDMT_getNbWorkers():
* @return nb threads currently active in mtctx. * @return nb threads currently active in mtctx.
* mtctx must be valid */ * mtctx must be valid */
unsigned ZSTDMT_getNbThreads(const ZSTDMT_CCtx* mtctx); unsigned ZSTDMT_getNbWorkers(const ZSTDMT_CCtx* mtctx);
/* ZSTDMT_getFrameProgression(): /* ZSTDMT_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame. * tells how much data has been consumed (input) and produced (output) for current frame.

View File

@ -506,7 +506,7 @@ ZSTDLIB_API size_t ZSTD_sizeof_DDict(const ZSTD_DDict* ddict);
* It will also consider src size to be arbitrarily "large", which is worst case. * It will also consider src size to be arbitrarily "large", which is worst case.
* If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation. * If srcSize is known to always be small, ZSTD_estimateCCtxSize_usingCParams() can provide a tighter estimation.
* ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. * ZSTD_estimateCCtxSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
* ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is > 1. * ZSTD_estimateCCtxSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
* Note : CCtx size estimation is only correct for single-threaded compression. */ * Note : CCtx size estimation is only correct for single-threaded compression. */
ZSTDLIB_API size_t ZSTD_estimateCCtxSize(int compressionLevel); ZSTDLIB_API size_t ZSTD_estimateCCtxSize(int compressionLevel);
ZSTDLIB_API size_t ZSTD_estimateCCtxSize_usingCParams(ZSTD_compressionParameters cParams); ZSTDLIB_API size_t ZSTD_estimateCCtxSize_usingCParams(ZSTD_compressionParameters cParams);
@ -518,7 +518,7 @@ ZSTDLIB_API size_t ZSTD_estimateDCtxSize(void);
* It will also consider src size to be arbitrarily "large", which is worst case. * It will also consider src size to be arbitrarily "large", which is worst case.
* If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation. * If srcSize is known to always be small, ZSTD_estimateCStreamSize_usingCParams() can provide a tighter estimation.
* ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel. * ZSTD_estimateCStreamSize_usingCParams() can be used in tandem with ZSTD_getCParams() to create cParams from compressionLevel.
* ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbThreads is set to a value > 1. * ZSTD_estimateCStreamSize_usingCCtxParams() can be used in tandem with ZSTD_CCtxParam_setParameter(). Only single-threaded compression is supported. This function will return an error code if ZSTD_p_nbWorkers is >= 1.
* Note : CStream size estimation is only correct for single-threaded compression. * Note : CStream size estimation is only correct for single-threaded compression.
* ZSTD_DStream memory budget depends on window Size. * ZSTD_DStream memory budget depends on window Size.
* This information can be passed manually, using ZSTD_estimateDStreamSize, * This information can be passed manually, using ZSTD_estimateDStreamSize,
@ -992,18 +992,13 @@ typedef enum {
/* multi-threading parameters */ /* multi-threading parameters */
/* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD). /* These parameters are only useful if multi-threading is enabled (ZSTD_MULTITHREAD).
* They return an error otherwise. */ * They return an error otherwise. */
ZSTD_p_nbThreads=400, /* Select how many threads a compression job can spawn (default:1) ZSTD_p_nbWorkers=400, /* Select how many threads will be spawned to compress in parallel.
* More threads improve speed, but also increase memory usage. * When nbWorkers >= 1, triggers asynchronous mode :
* Can only receive a value > 1 if ZSTD_MULTITHREAD is enabled. * ZSTD_compress_generic() consumes some input, flush some output if possible, and immediately gives back control to caller,
* Special: value 0 means "do not change nbThreads" */ * while compression work is performed in parallel, within worker threads.
ZSTD_p_nonBlockingMode, /* Single thread mode is by default "blocking" : * (note : a strong exception to this rule is when first invocation sets ZSTD_e_end : it becomes a blocking call).
* it finishes its job as much as possible, and only then gives back control to caller. * More workers improve speed, but also increase memory usage.
* In contrast, multi-thread is by default "non-blocking" : * Default value is `0`, aka "single-threaded mode" : no worker is spawned, compression is performed inside Caller's thread, all invocations are blocking */
* it takes some input, flush some output if available, and immediately gives back control to caller.
* Compression work is performed in parallel, within worker threads.
* (note : a strong exception to this rule is when first job is called with ZSTD_e_end : it becomes blocking)
* Setting this parameter to 1 will enforce non-blocking mode even when only 1 thread is selected.
* It allows the caller to do other tasks while the worker thread compresses in parallel. */
ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode. ZSTD_p_jobSize, /* Size of a compression job. This value is only enforced in streaming (non-blocking) mode.
* Each compression job is completed in parallel, so indirectly controls the nb of active threads. * Each compression job is completed in parallel, so indirectly controls the nb of active threads.
* 0 means default, which is dynamically determined based on compression parameters. * 0 means default, which is dynamically determined based on compression parameters.
@ -1015,7 +1010,7 @@ typedef enum {
/* advanced parameters - may not remain available after API update */ /* advanced parameters - may not remain available after API update */
ZSTD_p_forceMaxWindow=1100, /* Force back-reference distances to remain < windowSize, ZSTD_p_forceMaxWindow=1100, /* Force back-reference distances to remain < windowSize,
* even when referencing into Dictionary content (default:0) */ * even when referencing into Dictionary content (default:0) */
ZSTD_p_enableLongDistanceMatching=1200, /* Enable long distance matching. ZSTD_p_enableLongDistanceMatching=1200, /* Enable long distance matching.
* This parameter is designed to improve the compression * This parameter is designed to improve the compression
* ratio for large inputs with long distance matches. * ratio for large inputs with long distance matches.
* This increases the memory usage as well as window size. * This increases the memory usage as well as window size.
@ -1025,33 +1020,39 @@ typedef enum {
* other LDM parameters. Setting the compression level * other LDM parameters. Setting the compression level
* after this parameter overrides the window log, though LDM * after this parameter overrides the window log, though LDM
* will remain enabled until explicitly disabled. */ * will remain enabled until explicitly disabled. */
ZSTD_p_ldmHashLog, /* Size of the table for long distance matching, as a power of 2. ZSTD_p_ldmHashLog, /* Size of the table for long distance matching, as a power of 2.
* Larger values increase memory usage and compression ratio, but decrease * Larger values increase memory usage and compression ratio, but decrease
* compression speed. * compression speed.
* Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX * Must be clamped between ZSTD_HASHLOG_MIN and ZSTD_HASHLOG_MAX
* (default: windowlog - 7). */ * (default: windowlog - 7).
ZSTD_p_ldmMinMatch, /* Minimum size of searched matches for long distance matcher. * Special: value 0 means "do not change ldmHashLog". */
* Larger/too small values usually decrease compression ratio. ZSTD_p_ldmMinMatch, /* Minimum size of searched matches for long distance matcher.
* Must be clamped between ZSTD_LDM_MINMATCH_MIN * Larger/too small values usually decrease compression ratio.
* and ZSTD_LDM_MINMATCH_MAX (default: 64). */ * Must be clamped between ZSTD_LDM_MINMATCH_MIN
ZSTD_p_ldmBucketSizeLog, /* Log size of each bucket in the LDM hash table for collision resolution. * and ZSTD_LDM_MINMATCH_MAX (default: 64).
* Larger values usually improve collision resolution but may decrease * Special: value 0 means "do not change ldmMinMatch". */
* compression speed. ZSTD_p_ldmBucketSizeLog, /* Log size of each bucket in the LDM hash table for collision resolution.
* The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3). */ * Larger values usually improve collision resolution but may decrease
* compression speed.
* The maximum value is ZSTD_LDM_BUCKETSIZELOG_MAX (default: 3).
* note : 0 is a valid value */
ZSTD_p_ldmHashEveryLog, /* Frequency of inserting/looking up entries in the LDM hash table. ZSTD_p_ldmHashEveryLog, /* Frequency of inserting/looking up entries in the LDM hash table.
* The default is MAX(0, (windowLog - ldmHashLog)) to * The default is MAX(0, (windowLog - ldmHashLog)) to
* optimize hash table usage. * optimize hash table usage.
* Larger values improve compression speed. Deviating far from the * Larger values improve compression speed. Deviating far from the
* default value will likely result in a decrease in compression ratio. * default value will likely result in a decrease in compression ratio.
* Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN. */ * Must be clamped between 0 and ZSTD_WINDOWLOG_MAX - ZSTD_HASHLOG_MIN.
* note : 0 is a valid value */
} ZSTD_cParameter; } ZSTD_cParameter;
/*! ZSTD_CCtx_setParameter() : /*! ZSTD_CCtx_setParameter() :
* Set one compression parameter, selected by enum ZSTD_cParameter. * Set one compression parameter, selected by enum ZSTD_cParameter.
* Setting a parameter is generally only possible during frame initialization (before starting compression),
* except for a few exceptions which can be updated during compression: compressionLevel, hashLog, chainLog, searchLog, minMatch, targetLength and strategy.
* Note : when `value` is an enum, cast it to unsigned for proper type checking. * Note : when `value` is an enum, cast it to unsigned for proper type checking.
* @result : informational value (typically, the one being set, possibly corrected), * @result : informational value (typically, value being set clamped correctly),
* or an error code (which can be tested with ZSTD_isError()). */ * or an error code (which can be tested with ZSTD_isError()). */
ZSTDLIB_API size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value); ZSTDLIB_API size_t ZSTD_CCtx_setParameter(ZSTD_CCtx* cctx, ZSTD_cParameter param, unsigned value);
@ -1198,7 +1199,7 @@ ZSTDLIB_API size_t ZSTD_compress_generic_simpleArgs (
ZSTDLIB_API ZSTD_CCtx_params* ZSTD_createCCtxParams(void); ZSTDLIB_API ZSTD_CCtx_params* ZSTD_createCCtxParams(void);
/*! ZSTD_resetCCtxParams() : /*! ZSTD_resetCCtxParams() :
* Reset params to default, with the default compression level. * Reset params to default values.
*/ */
ZSTDLIB_API size_t ZSTD_resetCCtxParams(ZSTD_CCtx_params* params); ZSTDLIB_API size_t ZSTD_resetCCtxParams(ZSTD_CCtx_params* params);
@ -1227,9 +1228,10 @@ ZSTDLIB_API size_t ZSTD_CCtxParam_setParameter(ZSTD_CCtx_params* params, ZSTD_cP
/*! ZSTD_CCtx_setParametersUsingCCtxParams() : /*! ZSTD_CCtx_setParametersUsingCCtxParams() :
* Apply a set of ZSTD_CCtx_params to the compression context. * Apply a set of ZSTD_CCtx_params to the compression context.
* This must be done before the dictionary is loaded. * This can be done even after compression is started,
* The pledgedSrcSize is treated as unknown. * if nbWorkers==0, this will have no impact until a new compression is started.
* Multithreading parameters are applied only if nbThreads > 1. * if nbWorkers>=1, new parameters will be picked up at next job,
* with a few restrictions (windowLog, pledgedSrcSize, nbWorkers, jobSize, and overlapLog are not updated).
*/ */
ZSTDLIB_API size_t ZSTD_CCtx_setParametersUsingCCtxParams( ZSTDLIB_API size_t ZSTD_CCtx_setParametersUsingCCtxParams(
ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params); ZSTD_CCtx* cctx, const ZSTD_CCtx_params* params);

View File

@ -122,12 +122,12 @@ void BMK_setBlockSize(size_t blockSize)
void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); } void BMK_setDecodeOnlyMode(unsigned decodeFlag) { g_decodeOnly = (decodeFlag>0); }
static U32 g_nbThreads = 1; static U32 g_nbWorkers = 0;
void BMK_setNbThreads(unsigned nbThreads) { void BMK_setNbWorkers(unsigned nbWorkers) {
#ifndef ZSTD_MULTITHREAD #ifndef ZSTD_MULTITHREAD
if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); if (nbWorkers > 0) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
#endif #endif
g_nbThreads = nbThreads; g_nbWorkers = nbWorkers;
} }
static U32 g_realTime = 0; static U32 g_realTime = 0;
@ -295,7 +295,7 @@ static int BMK_benchMem(const void* srcBuffer, size_t srcSize,
if (!cCompleted) { /* still some time to do compression tests */ if (!cCompleted) { /* still some time to do compression tests */
U64 const clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1; U64 const clockLoop = g_nbSeconds ? TIMELOOP_MICROSEC : 1;
U32 nbLoops = 0; U32 nbLoops = 0;
ZSTD_CCtx_setParameter(ctx, ZSTD_p_nbThreads, g_nbThreads); ZSTD_CCtx_setParameter(ctx, ZSTD_p_nbWorkers, g_nbWorkers);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_compressionLevel, cLevel); ZSTD_CCtx_setParameter(ctx, ZSTD_p_compressionLevel, cLevel);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag); ZSTD_CCtx_setParameter(ctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag);
ZSTD_CCtx_setParameter(ctx, ZSTD_p_ldmMinMatch, g_ldmMinMatch); ZSTD_CCtx_setParameter(ctx, ZSTD_p_ldmMinMatch, g_ldmMinMatch);

View File

@ -22,7 +22,7 @@ int BMK_benchFiles(const char** fileNamesTable, unsigned nbFiles, const char* di
/* Set Parameters */ /* Set Parameters */
void BMK_setNbSeconds(unsigned nbLoops); void BMK_setNbSeconds(unsigned nbLoops);
void BMK_setBlockSize(size_t blockSize); void BMK_setBlockSize(size_t blockSize);
void BMK_setNbThreads(unsigned nbThreads); void BMK_setNbWorkers(unsigned nbWorkers);
void BMK_setRealTime(unsigned priority); void BMK_setRealTime(unsigned priority);
void BMK_setNotificationLevel(unsigned level); void BMK_setNotificationLevel(unsigned level);
void BMK_setSeparateFiles(unsigned separate); void BMK_setSeparateFiles(unsigned separate);

View File

@ -36,18 +36,21 @@
# include <io.h> # include <io.h>
#endif #endif
#include "bitstream.h"
#include "mem.h" #include "mem.h"
#include "fileio.h" #include "fileio.h"
#include "util.h" #include "util.h"
#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */ #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */
#include "zstd.h" #include "zstd.h"
#include "zstd_errors.h" /* ZSTD_error_frameParameter_windowTooLarge */
#if defined(ZSTD_GZCOMPRESS) || defined(ZSTD_GZDECOMPRESS) #if defined(ZSTD_GZCOMPRESS) || defined(ZSTD_GZDECOMPRESS)
# include <zlib.h> # include <zlib.h>
# if !defined(z_const) # if !defined(z_const)
# define z_const # define z_const
# endif # endif
#endif #endif
#if defined(ZSTD_LZMACOMPRESS) || defined(ZSTD_LZMADECOMPRESS) #if defined(ZSTD_LZMACOMPRESS) || defined(ZSTD_LZMADECOMPRESS)
# include <lzma.h> # include <lzma.h>
#endif #endif
@ -215,23 +218,23 @@ static U32 g_removeSrcFile = 0;
void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); } void FIO_setRemoveSrcFile(unsigned flag) { g_removeSrcFile = (flag>0); }
static U32 g_memLimit = 0; static U32 g_memLimit = 0;
void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; } void FIO_setMemLimit(unsigned memLimit) { g_memLimit = memLimit; }
static U32 g_nbThreads = 1; static U32 g_nbWorkers = 1;
void FIO_setNbThreads(unsigned nbThreads) { void FIO_setNbWorkers(unsigned nbWorkers) {
#ifndef ZSTD_MULTITHREAD #ifndef ZSTD_MULTITHREAD
if (nbThreads > 1) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n"); if (nbWorkers > 0) DISPLAYLEVEL(2, "Note : multi-threading is disabled \n");
#endif #endif
g_nbThreads = nbThreads; g_nbWorkers = nbWorkers;
} }
static U32 g_blockSize = 0; static U32 g_blockSize = 0;
void FIO_setBlockSize(unsigned blockSize) { void FIO_setBlockSize(unsigned blockSize) {
if (blockSize && g_nbThreads==1) if (blockSize && g_nbWorkers==0)
DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n"); DISPLAYLEVEL(2, "Setting block size is useless in single-thread mode \n");
g_blockSize = blockSize; g_blockSize = blockSize;
} }
#define FIO_OVERLAP_LOG_NOTSET 9999 #define FIO_OVERLAP_LOG_NOTSET 9999
static U32 g_overlapLog = FIO_OVERLAP_LOG_NOTSET; static U32 g_overlapLog = FIO_OVERLAP_LOG_NOTSET;
void FIO_setOverlapLog(unsigned overlapLog){ void FIO_setOverlapLog(unsigned overlapLog){
if (overlapLog && g_nbThreads==1) if (overlapLog && g_nbWorkers==0)
DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n"); DISPLAYLEVEL(2, "Setting overlapLog is useless in single-thread mode \n");
g_overlapLog = overlapLog; g_overlapLog = overlapLog;
} }
@ -427,7 +430,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
if (!ress.srcBuffer || !ress.dstBuffer) if (!ress.srcBuffer || !ress.dstBuffer)
EXM_THROW(31, "allocation error : not enough memory"); EXM_THROW(31, "allocation error : not enough memory");
/* Advances parameters, including dictionary */ /* Advanced parameters, including dictionary */
{ void* dictBuffer; { void* dictBuffer;
size_t const dictBuffSize = FIO_createDictBuffer(&dictBuffer, dictFileName); /* works with dictFileName==NULL */ size_t const dictBuffSize = FIO_createDictBuffer(&dictBuffer, dictFileName); /* works with dictFileName==NULL */
if (dictFileName && (dictBuffer==NULL)) if (dictFileName && (dictBuffer==NULL))
@ -439,8 +442,7 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
/* compression level */ /* compression level */
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, cLevel) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionLevel, cLevel) );
/* long distance matching */ /* long distance matching */
CHECK( ZSTD_CCtx_setParameter( CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag) );
ress.cctx, ZSTD_p_enableLongDistanceMatching, g_ldmFlag) );
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_ldmHashLog, g_ldmHashLog) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_ldmHashLog, g_ldmHashLog) );
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_ldmMinMatch, g_ldmMinMatch) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_ldmMinMatch, g_ldmMinMatch) );
if (g_ldmBucketSizeLog != FIO_LDM_PARAM_NOTSET) { if (g_ldmBucketSizeLog != FIO_LDM_PARAM_NOTSET) {
@ -458,13 +460,12 @@ static cRess_t FIO_createCResources(const char* dictFileName, int cLevel,
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_targetLength, comprParams->targetLength) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_targetLength, comprParams->targetLength) );
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionStrategy, (U32)comprParams->strategy) ); CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_compressionStrategy, (U32)comprParams->strategy) );
/* multi-threading */ /* multi-threading */
DISPLAYLEVEL(5,"set nb threads = %u \n", g_nbThreads);
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbThreads, g_nbThreads) );
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nonBlockingMode, 1) ); DISPLAYLEVEL(5,"set nb workers = %u \n", g_nbWorkers);
CHECK( ZSTD_CCtx_setParameter(ress.cctx, ZSTD_p_nbWorkers, g_nbWorkers) );
#endif #endif
/* dictionary */ /* dictionary */
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* just for dictionary loading, for compression parameters adaptation */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, srcSize) ); /* set the value temporarily for dictionary loading, to adapt compression parameters */
CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) ); CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, dictBuffer, dictBuffSize) );
CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, ZSTD_CONTENTSIZE_UNKNOWN) ); /* reset */ CHECK( ZSTD_CCtx_setPledgedSrcSize(ress.cctx, ZSTD_CONTENTSIZE_UNKNOWN) ); /* reset */
@ -735,56 +736,22 @@ static unsigned long long FIO_compressLz4Frame(cRess_t* ress,
* @return : 0 : compression completed correctly, * @return : 0 : compression completed correctly,
* 1 : missing or pb opening srcFileName * 1 : missing or pb opening srcFileName
*/ */
static int FIO_compressFilename_internal(cRess_t ress, static unsigned long long
const char* dstFileName, const char* srcFileName, int compressionLevel) FIO_compressZstdFrame(const cRess_t* ressPtr,
const char* srcFileName, U64 fileSize,
int compressionLevel, U64* readsize)
{ {
cRess_t const ress = *ressPtr;
FILE* const srcFile = ress.srcFile; FILE* const srcFile = ress.srcFile;
FILE* const dstFile = ress.dstFile; FILE* const dstFile = ress.dstFile;
U64 readsize = 0;
U64 compressedfilesize = 0; U64 compressedfilesize = 0;
U64 const fileSize = UTIL_getFileSize(srcFileName);
ZSTD_EndDirective directive = ZSTD_e_continue; ZSTD_EndDirective directive = ZSTD_e_continue;
DISPLAYLEVEL(5, "%s: %u bytes \n", srcFileName, (U32)fileSize); DISPLAYLEVEL(6, "compression using zstd format \n");
switch (g_compressionType) {
case FIO_zstdCompression:
break;
case FIO_gzipCompression:
#ifdef ZSTD_GZCOMPRESS
compressedfilesize = FIO_compressGzFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as gzip (zstd compiled without ZSTD_GZCOMPRESS) -- ignored \n",
srcFileName);
#endif
goto finish;
case FIO_xzCompression:
case FIO_lzmaCompression:
#ifdef ZSTD_LZMACOMPRESS
compressedfilesize = FIO_compressLzmaFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize, g_compressionType==FIO_lzmaCompression);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as xz/lzma (zstd compiled without ZSTD_LZMACOMPRESS) -- ignored \n",
srcFileName);
#endif
goto finish;
case FIO_lz4Compression:
#ifdef ZSTD_LZ4COMPRESS
compressedfilesize = FIO_compressLz4Frame(&ress, srcFileName, fileSize, compressionLevel, &readsize);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as lz4 (zstd compiled without ZSTD_LZ4COMPRESS) -- ignored \n",
srcFileName);
#endif
goto finish;
}
/* init */ /* init */
if (fileSize != UTIL_FILESIZE_UNKNOWN) if (fileSize != UTIL_FILESIZE_UNKNOWN)
ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize); ZSTD_CCtx_setPledgedSrcSize(ress.cctx, fileSize);
(void)compressionLevel; (void)srcFileName;
/* Main compression loop */ /* Main compression loop */
do { do {
@ -793,9 +760,9 @@ static int FIO_compressFilename_internal(cRess_t ress,
size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile);
ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 };
DISPLAYLEVEL(6, "fread %u bytes from source \n", (U32)inSize); DISPLAYLEVEL(6, "fread %u bytes from source \n", (U32)inSize);
readsize += inSize; *readsize += inSize;
if (inSize == 0 || (fileSize != UTIL_FILESIZE_UNKNOWN && readsize == fileSize)) if ((inSize == 0) || (*readsize == fileSize))
directive = ZSTD_e_end; directive = ZSTD_e_end;
result = 1; result = 1;
@ -809,12 +776,13 @@ static int FIO_compressFilename_internal(cRess_t ress,
if (outBuff.pos) { if (outBuff.pos) {
size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile);
if (sizeCheck!=outBuff.pos) if (sizeCheck!=outBuff.pos)
EXM_THROW(25, "Write error : cannot write compressed block into %s", dstFileName); EXM_THROW(25, "Write error : cannot write compressed block");
compressedfilesize += outBuff.pos; compressedfilesize += outBuff.pos;
} }
if (READY_FOR_UPDATE()) { if (READY_FOR_UPDATE()) {
ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx); ZSTD_frameProgression const zfp = ZSTD_getFrameProgression(ress.cctx);
DISPLAYUPDATE(2, "\rRead :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%", DISPLAYUPDATE(2, "\r(%i) Read :%6u MB - Consumed :%6u MB - Compressed :%6u MB => %.2f%%",
compressionLevel,
(U32)(zfp.ingested >> 20), (U32)(zfp.ingested >> 20),
(U32)(zfp.consumed >> 20), (U32)(zfp.consumed >> 20),
(U32)(zfp.produced >> 20), (U32)(zfp.produced >> 20),
@ -823,10 +791,67 @@ static int FIO_compressFilename_internal(cRess_t ress,
} }
} while (directive != ZSTD_e_end); } while (directive != ZSTD_e_end);
finish: return compressedfilesize;
}
/*! FIO_compressFilename_internal() :
* same as FIO_compressFilename_extRess(), with `ress.desFile` already opened.
* @return : 0 : compression completed correctly,
* 1 : missing or pb opening srcFileName
*/
static int
FIO_compressFilename_internal(cRess_t ress,
const char* dstFileName, const char* srcFileName,
int compressionLevel)
{
U64 readsize = 0;
U64 compressedfilesize = 0;
U64 const fileSize = UTIL_getFileSize(srcFileName);
DISPLAYLEVEL(5, "%s: %u bytes \n", srcFileName, (U32)fileSize);
/* compression format selection */
switch (g_compressionType) {
default:
case FIO_zstdCompression:
compressedfilesize = FIO_compressZstdFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize);
break;
case FIO_gzipCompression:
#ifdef ZSTD_GZCOMPRESS
compressedfilesize = FIO_compressGzFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as gzip (zstd compiled without ZSTD_GZCOMPRESS) -- ignored \n",
srcFileName);
#endif
break;
case FIO_xzCompression:
case FIO_lzmaCompression:
#ifdef ZSTD_LZMACOMPRESS
compressedfilesize = FIO_compressLzmaFrame(&ress, srcFileName, fileSize, compressionLevel, &readsize, g_compressionType==FIO_lzmaCompression);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as xz/lzma (zstd compiled without ZSTD_LZMACOMPRESS) -- ignored \n",
srcFileName);
#endif
break;
case FIO_lz4Compression:
#ifdef ZSTD_LZ4COMPRESS
compressedfilesize = FIO_compressLz4Frame(&ress, srcFileName, fileSize, compressionLevel, &readsize);
#else
(void)compressionLevel;
EXM_THROW(20, "zstd: %s: file cannot be compressed as lz4 (zstd compiled without ZSTD_LZ4COMPRESS) -- ignored \n",
srcFileName);
#endif
break;
}
/* Status */ /* Status */
DISPLAYLEVEL(2, "\r%79s\r", ""); DISPLAYLEVEL(2, "\r%79s\r", "");
DISPLAYLEVEL(2,"%-20s :%6.2f%% (%6llu => %6llu bytes, %s) \n", srcFileName, DISPLAYLEVEL(2,"%-20s :%6.2f%% (%6llu => %6llu bytes, %s) \n",
srcFileName,
(double)compressedfilesize / (readsize+(!readsize)/*avoid div by zero*/) * 100, (double)compressedfilesize / (readsize+(!readsize)/*avoid div by zero*/) * 100,
(unsigned long long)readsize, (unsigned long long) compressedfilesize, (unsigned long long)readsize, (unsigned long long) compressedfilesize,
dstFileName); dstFileName);
@ -1142,33 +1167,46 @@ static unsigned FIO_passThrough(FILE* foutput, FILE* finput, void* buffer, size_
return 0; return 0;
} }
static void FIO_zstdErrorHelp(dRess_t* ress, size_t ret, char const* srcFileName) /* FIO_highbit64() :
* gives position of highest bit.
* note : only works for v > 0 !
*/
static unsigned FIO_highbit64(unsigned long long v)
{
unsigned count = 0;
assert(v != 0);
v >>= 1;
while (v) { v >>= 1; count++; }
return count;
}
/* FIO_zstdErrorHelp() :
* detailed error message when requested window size is too large */
static void FIO_zstdErrorHelp(dRess_t* ress, size_t err, char const* srcFileName)
{ {
ZSTD_frameHeader header; ZSTD_frameHeader header;
/* No special help for these errors */
if (ZSTD_getErrorCode(ret) != ZSTD_error_frameParameter_windowTooLarge) /* Help message only for one specific error */
if (ZSTD_getErrorCode(err) != ZSTD_error_frameParameter_windowTooLarge)
return; return;
/* Try to decode the frame header */ /* Try to decode the frame header */
ret = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded); err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded);
if (ret == 0) { if (err == 0) {
U32 const windowSize = (U32)header.windowSize; unsigned long long const windowSize = header.windowSize;
U32 const windowLog = BIT_highbit32(windowSize) + ((windowSize & (windowSize - 1)) != 0); U32 const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0);
U32 const windowMB = (windowSize >> 20) + ((windowSize & ((1 MB) - 1)) != 0); U32 const windowMB = (U32)((windowSize >> 20) + ((windowSize & ((1 MB) - 1)) != 0));
assert(header.windowSize <= (U64)((U32)-1)); assert(windowSize < (U64)(1ULL << 52));
assert(g_memLimit > 0); assert(g_memLimit > 0);
DISPLAYLEVEL(1, "%s : Window size larger than maximum : %llu > %u\n", DISPLAYLEVEL(1, "%s : Window size larger than maximum : %llu > %u\n",
srcFileName, header.windowSize, g_memLimit); srcFileName, windowSize, g_memLimit);
if (windowLog <= ZSTD_WINDOWLOG_MAX) { if (windowLog <= ZSTD_WINDOWLOG_MAX) {
DISPLAYLEVEL(1, "%s : Use --long=%u or --memory=%uMB\n", DISPLAYLEVEL(1, "%s : Use --long=%u or --memory=%uMB\n",
srcFileName, windowLog, windowMB); srcFileName, windowLog, windowMB);
return; return;
} }
} else if (ZSTD_getErrorCode(ret) != ZSTD_error_frameParameter_windowTooLarge) {
DISPLAYLEVEL(1, "%s : Error decoding frame header to read window size : %s\n",
srcFileName, ZSTD_getErrorName(ret));
return;
} }
DISPLAYLEVEL(1, "%s : Window log larger than ZSTD_WINDOWLOG_MAX=%u not supported\n", DISPLAYLEVEL(1, "%s : Window log larger than ZSTD_WINDOWLOG_MAX=%u; not supported\n",
srcFileName, ZSTD_WINDOWLOG_MAX); srcFileName, ZSTD_WINDOWLOG_MAX);
} }

View File

@ -54,7 +54,7 @@ void FIO_setDictIDFlag(unsigned dictIDFlag);
void FIO_setChecksumFlag(unsigned checksumFlag); void FIO_setChecksumFlag(unsigned checksumFlag);
void FIO_setRemoveSrcFile(unsigned flag); void FIO_setRemoveSrcFile(unsigned flag);
void FIO_setMemLimit(unsigned memLimit); void FIO_setMemLimit(unsigned memLimit);
void FIO_setNbThreads(unsigned nbThreads); void FIO_setNbWorkers(unsigned nbWorkers);
void FIO_setBlockSize(unsigned blockSize); void FIO_setBlockSize(unsigned blockSize);
void FIO_setOverlapLog(unsigned overlapLog); void FIO_setOverlapLog(unsigned overlapLog);
void FIO_setLdmFlag(unsigned ldmFlag); void FIO_setLdmFlag(unsigned ldmFlag);

View File

@ -116,10 +116,16 @@ the last one takes effect.
Note: If `windowLog` is set to larger than 27, `--long=windowLog` or Note: If `windowLog` is set to larger than 27, `--long=windowLog` or
`--memory=windowSize` needs to be passed to the decompressor. `--memory=windowSize` needs to be passed to the decompressor.
* `-T#`, `--threads=#`: * `-T#`, `--threads=#`:
Compress using `#` threads (default: 1). Compress using `#` working threads (default: 1).
If `#` is 0, attempt to detect and use the number of physical CPU cores. If `#` is 0, attempt to detect and use the number of physical CPU cores.
In all cases, the nb of threads is capped to ZSTDMT_NBTHREADS_MAX==256. In all cases, the nb of threads is capped to ZSTDMT_NBTHREADS_MAX==200.
This modifier does nothing if `zstd` is compiled without multithread support. This modifier does nothing if `zstd` is compiled without multithread support.
* `--single-thread`:
Does not spawn a thread for compression, use caller thread instead.
This is the only available mode when multithread support is disabled.
In this mode, compression is serialized with I/O.
(This is different from `-T1`, which spawns 1 compression thread in parallel of I/O).
Single-thread mode also features lower memory usage.
* `-D file`: * `-D file`:
use `file` as Dictionary to compress or decompress FILE(s) use `file` as Dictionary to compress or decompress FILE(s)
* `--nodictID`: * `--nodictID`:

View File

@ -135,7 +135,7 @@ static int usage_advanced(const char* programName)
DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel()); DISPLAY( "--ultra : enable levels beyond %i, up to %i (requires more memory)\n", ZSTDCLI_CLEVEL_MAX, ZSTD_maxCLevel());
DISPLAY( "--long[=#] : enable long distance matching with given window log (default: %u)\n", g_defaultMaxWindowLog); DISPLAY( "--long[=#] : enable long distance matching with given window log (default: %u)\n", g_defaultMaxWindowLog);
#ifdef ZSTD_MULTITHREAD #ifdef ZSTD_MULTITHREAD
DISPLAY( " -T# : use # threads for compression (default: 1) \n"); DISPLAY( " -T# : spawns # compression threads (default: 1) \n");
DISPLAY( " -B# : select size of each job (default: 0==automatic) \n"); DISPLAY( " -B# : select size of each job (default: 0==automatic) \n");
#endif #endif
DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n"); DISPLAY( "--no-dictID : don't write dictID into header (dictionary compression)\n");
@ -366,21 +366,21 @@ typedef enum { zom_compress, zom_decompress, zom_test, zom_bench, zom_train, zom
int main(int argCount, const char* argv[]) int main(int argCount, const char* argv[])
{ {
int argNb, int argNb,
forceStdout=0,
followLinks=0, followLinks=0,
forceStdout=0,
lastCommand = 0,
ldmFlag = 0,
main_pause=0, main_pause=0,
nextEntryIsDictionary=0, nbWorkers = 1,
operationResult=0,
nextArgumentIsOutFileName=0, nextArgumentIsOutFileName=0,
nextArgumentIsMaxDict=0, nextArgumentIsMaxDict=0,
nextArgumentIsDictID=0, nextArgumentIsDictID=0,
nextArgumentsAreFiles=0, nextArgumentsAreFiles=0,
ultra=0, nextEntryIsDictionary=0,
lastCommand = 0, operationResult=0,
nbThreads = 1,
setRealTimePrio = 0,
separateFiles = 0, separateFiles = 0,
ldmFlag = 0; setRealTimePrio = 0,
ultra=0;
unsigned bench_nbSeconds = 3; /* would be better if this value was synchronized from bench */ unsigned bench_nbSeconds = 3; /* would be better if this value was synchronized from bench */
size_t blockSize = 0; size_t blockSize = 0;
zstd_operation_mode operation = zom_compress; zstd_operation_mode operation = zom_compress;
@ -422,7 +422,7 @@ int main(int argCount, const char* argv[])
programName = lastNameFromPath(programName); programName = lastNameFromPath(programName);
/* preset behaviors */ /* preset behaviors */
if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbThreads=0; if (exeNameMatch(programName, ZSTD_ZSTDMT)) nbWorkers=0;
if (exeNameMatch(programName, ZSTD_UNZSTD)) operation=zom_decompress; if (exeNameMatch(programName, ZSTD_UNZSTD)) operation=zom_decompress;
if (exeNameMatch(programName, ZSTD_CAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* supports multiple formats */ if (exeNameMatch(programName, ZSTD_CAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* supports multiple formats */
if (exeNameMatch(programName, ZSTD_ZCAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* behave like zcat, also supports multiple formats */ if (exeNameMatch(programName, ZSTD_ZCAT)) { operation=zom_decompress; forceStdout=1; FIO_overwriteMode(); outFileName=stdoutmark; g_displayLevel=1; } /* behave like zcat, also supports multiple formats */
@ -481,6 +481,7 @@ int main(int argCount, const char* argv[])
if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(0); continue; } if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(0); continue; }
if (!strcmp(argument, "--rm")) { FIO_setRemoveSrcFile(1); continue; } if (!strcmp(argument, "--rm")) { FIO_setRemoveSrcFile(1); continue; }
if (!strcmp(argument, "--priority=rt")) { setRealTimePrio = 1; continue; } if (!strcmp(argument, "--priority=rt")) { setRealTimePrio = 1; continue; }
if (!strcmp(argument, "--single-thread")) { nbWorkers = 0; continue; }
#ifdef ZSTD_GZCOMPRESS #ifdef ZSTD_GZCOMPRESS
if (!strcmp(argument, "--format=gzip")) { suffix = GZ_EXTENSION; FIO_setCompressionType(FIO_gzipCompression); continue; } if (!strcmp(argument, "--format=gzip")) { suffix = GZ_EXTENSION; FIO_setCompressionType(FIO_gzipCompression); continue; }
#endif #endif
@ -515,7 +516,7 @@ int main(int argCount, const char* argv[])
continue; continue;
} }
#endif #endif
if (longCommandWArg(&argument, "--threads=")) { nbThreads = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--threads=")) { nbWorkers = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--memlimit=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memlimit=")) { memLimit = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--memory=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memory=")) { memLimit = readU32FromChar(&argument); continue; }
if (longCommandWArg(&argument, "--memlimit-decompress=")) { memLimit = readU32FromChar(&argument); continue; } if (longCommandWArg(&argument, "--memlimit-decompress=")) { memLimit = readU32FromChar(&argument); continue; }
@ -648,7 +649,7 @@ int main(int argCount, const char* argv[])
/* nb of threads (hidden option) */ /* nb of threads (hidden option) */
case 'T': case 'T':
argument++; argument++;
nbThreads = readU32FromChar(&argument); nbWorkers = readU32FromChar(&argument);
break; break;
/* Dictionary Selection level */ /* Dictionary Selection level */
@ -716,10 +717,10 @@ int main(int argCount, const char* argv[])
/* Welcome message (if verbose) */ /* Welcome message (if verbose) */
DISPLAYLEVEL(3, WELCOME_MESSAGE); DISPLAYLEVEL(3, WELCOME_MESSAGE);
if (nbThreads == 0) { if (nbWorkers == 0) {
/* try to guess */ /* automatically set # workers based on # of reported cpus */
nbThreads = UTIL_countPhysicalCores(); nbWorkers = UTIL_countPhysicalCores();
DISPLAYLEVEL(3, "Note: %d physical core(s) detected \n", nbThreads); DISPLAYLEVEL(3, "Note: %d physical core(s) detected \n", nbWorkers);
} }
g_utilDisplayLevel = g_displayLevel; g_utilDisplayLevel = g_displayLevel;
@ -763,7 +764,7 @@ int main(int argCount, const char* argv[])
BMK_setNotificationLevel(g_displayLevel); BMK_setNotificationLevel(g_displayLevel);
BMK_setSeparateFiles(separateFiles); BMK_setSeparateFiles(separateFiles);
BMK_setBlockSize(blockSize); BMK_setBlockSize(blockSize);
BMK_setNbThreads(nbThreads); BMK_setNbWorkers(nbWorkers);
BMK_setRealTime(setRealTimePrio); BMK_setRealTime(setRealTimePrio);
BMK_setNbSeconds(bench_nbSeconds); BMK_setNbSeconds(bench_nbSeconds);
BMK_setLdmFlag(ldmFlag); BMK_setLdmFlag(ldmFlag);
@ -791,7 +792,7 @@ int main(int argCount, const char* argv[])
zParams.dictID = dictID; zParams.dictID = dictID;
if (cover) { if (cover) {
int const optimize = !coverParams.k || !coverParams.d; int const optimize = !coverParams.k || !coverParams.d;
coverParams.nbThreads = nbThreads; coverParams.nbThreads = nbWorkers;
coverParams.zParams = zParams; coverParams.zParams = zParams;
operationResult = DiB_trainFromFiles(outFileName, maxDictSize, filenameTable, filenameIdx, blockSize, NULL, &coverParams, optimize); operationResult = DiB_trainFromFiles(outFileName, maxDictSize, filenameTable, filenameIdx, blockSize, NULL, &coverParams, optimize);
} else { } else {
@ -835,7 +836,7 @@ int main(int argCount, const char* argv[])
FIO_setNotificationLevel(g_displayLevel); FIO_setNotificationLevel(g_displayLevel);
if (operation==zom_compress) { if (operation==zom_compress) {
#ifndef ZSTD_NOCOMPRESS #ifndef ZSTD_NOCOMPRESS
FIO_setNbThreads(nbThreads); FIO_setNbWorkers(nbWorkers);
FIO_setBlockSize((U32)blockSize); FIO_setBlockSize((U32)blockSize);
FIO_setLdmFlag(ldmFlag); FIO_setLdmFlag(ldmFlag);
FIO_setLdmHashLog(g_ldmHashLog); FIO_setLdmHashLog(g_ldmHashLog);

View File

@ -181,7 +181,7 @@ static size_t local_ZSTD_compress_generic_T2_end(void* dst, size_t dstCapacity,
ZSTD_inBuffer buffIn; ZSTD_inBuffer buffIn;
(void)buff2; (void)buff2;
ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1);
ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2); ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbWorkers, 2);
buffOut.dst = dst; buffOut.dst = dst;
buffOut.size = dstCapacity; buffOut.size = dstCapacity;
buffOut.pos = 0; buffOut.pos = 0;
@ -198,7 +198,7 @@ static size_t local_ZSTD_compress_generic_T2_continue(void* dst, size_t dstCapac
ZSTD_inBuffer buffIn; ZSTD_inBuffer buffIn;
(void)buff2; (void)buff2;
ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1); ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_compressionLevel, 1);
ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbThreads, 2); ZSTD_CCtx_setParameter(g_cstream, ZSTD_p_nbWorkers, 2);
buffOut.dst = dst; buffOut.dst = dst;
buffOut.size = dstCapacity; buffOut.size = dstCapacity;
buffOut.pos = 0; buffOut.pos = 0;

View File

@ -53,7 +53,7 @@ static const U32 nbTestsDefault = 30000;
/*-************************************ /*-************************************
* Display Macros * Display Macros
**************************************/ **************************************/
#define DISPLAY(...) fprintf(stdout, __VA_ARGS__) #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
#define DISPLAYLEVEL(l, ...) if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } #define DISPLAYLEVEL(l, ...) if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); }
static U32 g_displayLevel = 2; static U32 g_displayLevel = 2;
@ -63,7 +63,7 @@ static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER;
#define DISPLAYUPDATE(l, ...) if (g_displayLevel>=l) { \ #define DISPLAYUPDATE(l, ...) if (g_displayLevel>=l) { \
if ((UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) || (g_displayLevel>=4)) \ if ((UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) || (g_displayLevel>=4)) \
{ g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \ { g_displayClock = UTIL_getTime(); DISPLAY(__VA_ARGS__); \
if (g_displayLevel>=4) fflush(stdout); } } if (g_displayLevel>=4) fflush(stderr); } }
#undef MIN #undef MIN
@ -226,7 +226,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility, unsigned part)
ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_outBuffer out = { outBuffer, outSize, 0 };
ZSTD_inBuffer in = { inBuffer, inSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 };
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) );
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) );
while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {} while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {}
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);
DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ", DISPLAYLEVEL(3, "compress_generic,-T%u,end level %i : ",
@ -246,7 +246,7 @@ static int FUZ_mallocTests(unsigned seed, double compressibility, unsigned part)
ZSTD_outBuffer out = { outBuffer, outSize, 0 }; ZSTD_outBuffer out = { outBuffer, outSize, 0 };
ZSTD_inBuffer in = { inBuffer, inSize, 0 }; ZSTD_inBuffer in = { inBuffer, inSize, 0 };
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_compressionLevel, (U32)compressionLevel) );
CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbThreads, nbThreads) ); CHECK_Z( ZSTD_CCtx_setParameter(cctx, ZSTD_p_nbWorkers, nbThreads) );
CHECK_Z( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue) ); CHECK_Z( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_continue) );
while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {} while ( ZSTD_compress_generic(cctx, &out, &in, ZSTD_e_end) ) {}
ZSTD_freeCCtx(cctx); ZSTD_freeCCtx(cctx);

View File

@ -634,6 +634,7 @@ roundTripTest -g518K "19 --long"
fileRoundTripTest -g5M "3 --long" fileRoundTripTest -g5M "3 --long"
roundTripTest -g96K "5 --single-thread"
if [ -n "$hasMT" ] if [ -n "$hasMT" ]
then then
$ECHO "\n===> zstdmt round-trip tests " $ECHO "\n===> zstdmt round-trip tests "

View File

@ -94,7 +94,7 @@ static size_t cctxParamRoundTripTest(void* resultBuff, size_t resultBuffCapacity
/* Set parameters */ /* Set parameters */
CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_compressionLevel, cLevel) ); CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_compressionLevel, cLevel) );
CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbThreads, 2) ); CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_nbWorkers, 2) );
CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_overlapSizeLog, 5) ); CHECK_Z( ZSTD_CCtxParam_setParameter(cctxParams, ZSTD_p_overlapSizeLog, 5) );

View File

@ -753,9 +753,9 @@ static int basicUnitTests(U32 seed, double compressibility)
DISPLAYLEVEL(3, "OK \n"); DISPLAYLEVEL(3, "OK \n");
/* Complex multithreading + dictionary test */ /* Complex multithreading + dictionary test */
{ U32 const nbThreads = 2; { U32 const nbWorkers = 2;
size_t const jobSize = 4 * 1 MB; size_t const jobSize = 4 * 1 MB;
size_t const srcSize = jobSize * nbThreads; /* we want each job to have predictable size */ size_t const srcSize = jobSize * nbWorkers; /* we want each job to have predictable size */
size_t const segLength = 2 KB; size_t const segLength = 2 KB;
size_t const offset = 600 KB; /* must be larger than window defined in cdict */ size_t const offset = 600 KB; /* must be larger than window defined in cdict */
size_t const start = jobSize + (offset-1); size_t const start = jobSize + (offset-1);
@ -763,7 +763,7 @@ static int basicUnitTests(U32 seed, double compressibility)
BYTE* const dst = (BYTE*)CNBuffer + start - offset; BYTE* const dst = (BYTE*)CNBuffer + start - offset;
DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads + dictionary : ", testNb++, (U32)srcSize); DISPLAYLEVEL(3, "test%3i : compress %u bytes with multiple threads + dictionary : ", testNb++, (U32)srcSize);
CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_compressionLevel, 3) ); CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_compressionLevel, 3) );
CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_nbThreads, 2) ); CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_nbWorkers, nbWorkers) );
CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_jobSize, jobSize) ); CHECK_Z( ZSTD_CCtx_setParameter(zc, ZSTD_p_jobSize, jobSize) );
assert(start > offset); assert(start > offset);
assert(start + segLength < COMPRESSIBLE_NOISE_LENGTH); assert(start + segLength < COMPRESSIBLE_NOISE_LENGTH);
@ -1672,7 +1672,7 @@ static int fuzzerTests_newAPI(U32 seed, U32 nbTests, unsigned startTest, double
U32 const nbThreadsAdjusted = (windowLogMalus < nbThreadsCandidate) ? nbThreadsCandidate - windowLogMalus : 1; U32 const nbThreadsAdjusted = (windowLogMalus < nbThreadsCandidate) ? nbThreadsCandidate - windowLogMalus : 1;
U32 const nbThreads = MIN(nbThreadsAdjusted, nbThreadsMax); U32 const nbThreads = MIN(nbThreadsAdjusted, nbThreadsMax);
DISPLAYLEVEL(5, "t%u: nbThreads : %u \n", testNb, nbThreads); DISPLAYLEVEL(5, "t%u: nbThreads : %u \n", testNb, nbThreads);
CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_nbThreads, nbThreads, useOpaqueAPI) ); CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_nbWorkers, nbThreads, useOpaqueAPI) );
if (nbThreads > 1) { if (nbThreads > 1) {
U32 const jobLog = FUZ_rand(&lseed) % (testLog+1); U32 const jobLog = FUZ_rand(&lseed) % (testLog+1);
CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_overlapSizeLog, FUZ_rand(&lseed) % 10, useOpaqueAPI) ); CHECK_Z( setCCtxParameter(zc, cctxParams, ZSTD_p_overlapSizeLog, FUZ_rand(&lseed) % 10, useOpaqueAPI) );