Explicitly create a job cctxParam for multithreading

This commit is contained in:
Stella Lau 2017-08-21 15:39:37 -07:00
parent 5b956f4753
commit 60e1bc617c

View File

@ -186,19 +186,17 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
ZSTD_free(buf.start, bufPool->cMem); ZSTD_free(buf.start, bufPool->cMem);
} }
/** /* TODO: Set relevant job parameters, initialize others to default.
* TODO * Notably, nbThreads should be zero. */
* static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params)
* Sets parameters to zero for jobs. Notably, nbThreads should be zero?
*/
static void ZSTDMT_zeroCCtxParams(ZSTD_CCtx_params* params)
{ {
params->forceWindow = 0; ZSTD_CCtx_params jobParams;
params->dictMode = (ZSTD_dictMode_e)(0); memset(&jobParams, 0, sizeof(jobParams));
params->dictContentByRef = 0;
params->nbThreads = 0; jobParams.cParams = params.cParams;
params->jobSize = 0; jobParams.fParams = params.fParams;
params->overlapSizeLog = 0; jobParams.compressionLevel = params.compressionLevel;
return jobParams;
} }
/* ===== CCtx Pool ===== */ /* ===== CCtx Pool ===== */
@ -560,8 +558,7 @@ static size_t ZSTDMT_compress_advanced_opaque(
unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */ unsigned const compressWithinDst = (dstCapacity >= ZSTD_compressBound(srcSize)) ? nbChunks : (unsigned)(dstCapacity / ZSTD_compressBound(avgChunkSize)); /* presumes avgChunkSize >= 256 KB, which should be the case */
size_t frameStartPos = 0, dstBufferPos = 0; size_t frameStartPos = 0, dstBufferPos = 0;
XXH64_state_t xxh64; XXH64_state_t xxh64;
ZSTD_CCtx_params requestedParams = cctxParams; ZSTD_CCtx_params const requestedParams = ZSTDMT_makeJobCCtxParams(cctxParams);
ZSTDMT_zeroCCtxParams(&requestedParams);
DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize); DEBUGLOG(4, "nbChunks : %2u (chunkSize : %u bytes) ", nbChunks, (U32)avgChunkSize);
if (nbChunks==1) { /* fallback to single-thread mode */ if (nbChunks==1) { /* fallback to single-thread mode */
@ -720,19 +717,17 @@ size_t ZSTDMT_initCStream_internal_opaque(
const ZSTD_CDict* cdict, ZSTD_CCtx_params cctxParams, const ZSTD_CDict* cdict, ZSTD_CCtx_params cctxParams,
unsigned long long pledgedSrcSize) unsigned long long pledgedSrcSize)
{ {
ZSTD_CCtx_params const requestedParams = ZSTDMT_makeJobCCtxParams(cctxParams);
DEBUGLOG(4, "ZSTDMT_initCStream_internal"); DEBUGLOG(4, "ZSTDMT_initCStream_internal");
/* params are supposed to be fully validated at this point */ /* params are supposed to be fully validated at this point */
assert(!ZSTD_isError(ZSTD_checkCParams(cctxParams.cParams))); assert(!ZSTD_isError(ZSTD_checkCParams(cctxParams.cParams)));
assert(!((dict) && (cdict))); /* either dict or cdict, not both */ assert(!((dict) && (cdict))); /* either dict or cdict, not both */
/* TODO: Set stuff to 0 to preserve old semantics. */
ZSTDMT_zeroCCtxParams(&cctxParams);
if (zcs->nbThreads==1) { if (zcs->nbThreads==1) {
DEBUGLOG(4, "single thread mode"); DEBUGLOG(4, "single thread mode");
return ZSTD_initCStream_internal_opaque(zcs->cctxPool->cctx[0], return ZSTD_initCStream_internal_opaque(zcs->cctxPool->cctx[0],
dict, dictSize, cdict, dict, dictSize, cdict,
cctxParams, pledgedSrcSize); requestedParams, pledgedSrcSize);
} }
if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */ if (zcs->allJobsCompleted == 0) { /* previous compression not correctly finished */
@ -749,7 +744,7 @@ size_t ZSTDMT_initCStream_internal_opaque(
/* TODO: cctxParam version? Is this correct? */ /* TODO: cctxParam version? Is this correct? */
zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize, zcs->cdictLocal = ZSTD_createCDict_advanced(dict, dictSize,
0 /* byRef */, ZSTD_dm_auto, /* note : a loadPrefix becomes an internal CDict */ 0 /* byRef */, ZSTD_dm_auto, /* note : a loadPrefix becomes an internal CDict */
cctxParams.cParams, zcs->cMem); requestedParams.cParams, zcs->cMem);
zcs->cdict = zcs->cdictLocal; zcs->cdict = zcs->cdictLocal;
if (zcs->cdictLocal == NULL) return ERROR(memory_allocation); if (zcs->cdictLocal == NULL) return ERROR(memory_allocation);
} else { } else {
@ -804,13 +799,12 @@ size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx* mtctx,
ZSTD_frameParameters fParams, ZSTD_frameParameters fParams,
unsigned long long pledgedSrcSize) unsigned long long pledgedSrcSize)
{ {
ZSTD_CCtx_params params = ZSTD_getCCtxParamsFromCDict(cdict); ZSTD_CCtx_params requestedParams =
ZSTDMT_makeJobCCtxParams(ZSTD_getCCtxParamsFromCDict(cdict));
if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */ if (cdict==NULL) return ERROR(dictionary_wrong); /* method incompatible with NULL cdict */
ZSTDMT_zeroCCtxParams(&params); requestedParams.fParams = fParams;
params.fParams = fParams;
return ZSTDMT_initCStream_internal_opaque(mtctx, NULL, 0 /*dictSize*/, cdict, return ZSTDMT_initCStream_internal_opaque(mtctx, NULL, 0 /*dictSize*/, cdict,
params, pledgedSrcSize); requestedParams, pledgedSrcSize);
} }