zstdmt : fixed memory leak

writeLastEmptyBlock() must release srcBuffer
as mtctx assumes it's done by job worker.

minor : changed 2 job member names (src->srcBuffer, srcStart->prefixStart) for clarity
This commit is contained in:
Yann Collet 2018-01-26 10:44:09 -08:00
parent 8e128eaf05
commit fca13c6855

View File

@ -304,19 +304,19 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
/* ------------------------------------------ */
typedef struct {
size_t consumed; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */
size_t cSize; /* SHARED - init0 by mtctx, then modified by worker AND read by mtctx */
size_t consumed; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
size_t cSize; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
ZSTD_pthread_mutex_t* mtctx_mutex; /* Thread-safe - used by mtctx and (all) worker */
ZSTD_pthread_cond_t* mtctx_cond; /* Thread-safe - used by mtctx and (all) worker */
ZSTDMT_CCtxPool* cctxPool; /* Thread-safe - used by mtctx and (all) worker */
ZSTDMT_bufferPool* bufPool; /* Thread-safe - used by mtctx and (all) worker */
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */
buffer_t src; /* set by mtctx, then modified by worker => no barrier */
const void* srcStart; /* set by mtctx, then read by worker => no barrier */
size_t prefixSize; /* set by mtctx, then read by worker => no barrier */
size_t srcSize; /* set by mtctx, then read by worker => no barrier */
unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */
unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */
buffer_t dstBuff; /* set by worker (or mtctx), then read by worker, then modified by mtctx => no barrier */
buffer_t srcBuff; /* set by mtctx, then released by worker => no barrier */
const void* prefixStart; /* set by mtctx, then read by worker => no barrier */
size_t prefixSize; /* set by mtctx, then read by worker => no barrier */
size_t srcSize; /* set by mtctx, then read by worker => no barrier */
unsigned firstChunk; /* set by mtctx, then read by worker => no barrier */
unsigned lastChunk; /* set by mtctx, then read by worker => no barrier */
ZSTD_CCtx_params params; /* set by mtctx, then read by worker => no barrier */
const ZSTD_CDict* cdict; /* set by mtctx, then read by worker => no barrier */
unsigned long long fullFrameSize; /* set by mtctx, then read by worker => no barrier */
@ -329,7 +329,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
{
ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
const void* const src = (const char*)job->srcStart + job->prefixSize;
const void* const src = (const char*)job->prefixStart + job->prefixSize;
buffer_t dstBuff = job->dstBuff;
/* ressources */
@ -362,7 +362,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
goto _endJob;
} }
{ size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
job->prefixStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
NULL, /*cdict*/
jobParams, pledgedSrcSize);
if (ZSTD_isError(initError)) {
@ -419,8 +419,8 @@ void ZSTDMT_compressChunk(void* jobDescription)
_endJob:
/* release resources */
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
ZSTDMT_releaseBuffer(job->bufPool, job->src);
job->src = g_nullBuffer; job->srcStart = NULL;
ZSTDMT_releaseBuffer(job->bufPool, job->srcBuff);
job->srcBuff = g_nullBuffer; job->prefixStart = NULL;
/* report */
ZSTD_PTHREAD_MUTEX_LOCK(job->mtctx_mutex);
job->consumed = job->srcSize;
@ -557,9 +557,9 @@ static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx* mtctx)
DEBUGLOG(4, "job%02u: release dst address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].dstBuff.start);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].dstBuff);
mtctx->jobs[jobID].dstBuff = g_nullBuffer;
DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].src.start);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].src);
mtctx->jobs[jobID].src = g_nullBuffer;
DEBUGLOG(4, "job%02u: release src address %08X", jobID, (U32)(size_t)mtctx->jobs[jobID].srcBuff.start);
ZSTDMT_releaseBuffer(mtctx->bufPool, mtctx->jobs[jobID].srcBuff);
mtctx->jobs[jobID].srcBuff = g_nullBuffer;
}
memset(mtctx->jobs, 0, (mtctx->jobIDMask+1)*sizeof(ZSTDMT_jobDescription));
DEBUGLOG(4, "input: release address %08X", (U32)(size_t)mtctx->inBuff.buffer.start);
@ -757,8 +757,8 @@ static size_t ZSTDMT_compress_advanced_internal(
buffer_t const dstBuffer = u < compressWithinDst ? dstAsBuffer : g_nullBuffer;
size_t dictSize = u ? overlapSize : 0;
mtctx->jobs[u].src = g_nullBuffer;
mtctx->jobs[u].srcStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].srcBuff = g_nullBuffer;
mtctx->jobs[u].prefixStart = srcStart + frameStartPos - dictSize;
mtctx->jobs[u].prefixSize = dictSize;
mtctx->jobs[u].srcSize = chunkSize; assert(chunkSize > 0); /* avoid job.srcSize == 0 */
mtctx->jobs[u].consumed = 0;
@ -781,7 +781,7 @@ static size_t ZSTDMT_compress_advanced_internal(
}
DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u, (U32)chunkSize);
DEBUG_PRINTHEX(6, mtctx->jobs[u].srcStart, 12);
DEBUG_PRINTHEX(6, mtctx->jobs[u].prefixStart, 12);
POOL_add(mtctx->factory, ZSTDMT_compressChunk, &mtctx->jobs[u]);
frameStartPos += chunkSize;
@ -802,7 +802,7 @@ static size_t ZSTDMT_compress_advanced_internal(
ZSTD_pthread_mutex_unlock(&mtctx->mtctx_mutex);
DEBUGLOG(5, "ready to write chunk %u ", chunkID);
mtctx->jobs[chunkID].srcStart = NULL;
mtctx->jobs[chunkID].prefixStart = NULL;
{ size_t const cSize = mtctx->jobs[chunkID].cSize;
if (ZSTD_isError(cSize)) error = cSize;
if ((!error) && (dstPos + cSize > dstCapacity)) error = ERROR(dstSize_tooSmall);
@ -999,18 +999,21 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* mtctx, int compressionLevel) {
*/
static size_t ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription* job)
{
assert(job->srcSize == 0);
assert(job->lastChunk == 1);
assert(job->firstChunk == 0); /* first chunk needs to create frame header too */
assert(job->dstBuff.start == NULL); /* invoked from streaming variant only */
{ buffer_t const dstBuff = ZSTDMT_getBuffer(job->bufPool);
if (dstBuff.start==NULL) return ERROR(memory_allocation);
job->dstBuff = dstBuff; /* will be released by ZSTDMT_flushProduced() */
assert(dstBuff.size >= ZSTD_blockHeaderSize);
job->cSize = ZSTD_writeLastEmptyBlock(dstBuff.start, dstBuff.size);
assert(!ZSTD_isError(job->cSize));
assert(job->consumed == 0);
}
assert(job->srcSize == 0); /* last chunk is empty -> will be simplified into a last empty block */
assert(job->firstChunk == 0); /* cannot be first chunk, as it also needs to create frame header */
/* A job created by streaming variant starts with a src buffer, but no dst buffer.
* It summons a dstBuffer itself, compresses into it, then releases srcBuffer, and gives result to mtctx.
* When done, srcBuffer is empty, while dstBuffer is filled, and will be released by mtctx.
* This shortcut will simply switch srcBuffer for dstBuffer, providing same outcome as a normal job */
assert(job->dstBuff.start == NULL); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */
assert(job->srcBuff.start != NULL); /* invoked from streaming variant only (otherwise, srcBuff might be user's input) */
assert(job->srcBuff.size >= ZSTD_blockHeaderSize);
job->dstBuff = job->srcBuff;
job->srcBuff = g_nullBuffer;
job->cSize = ZSTD_writeLastEmptyBlock(job->dstBuff.start, job->dstBuff.size);
assert(!ZSTD_isError(job->cSize));
assert(job->consumed == 0);
return 0;
}
@ -1028,12 +1031,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
if (!mtctx->jobReady) {
DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
mtctx->nextJobID, (U32)srcSize, (U32)mtctx->prefixSize);
mtctx->jobs[jobID].src = mtctx->inBuff.buffer;
mtctx->jobs[jobID].srcStart = mtctx->inBuff.buffer.start;
mtctx->jobs[jobID].srcBuff = mtctx->inBuff.buffer;
mtctx->jobs[jobID].prefixStart = mtctx->inBuff.buffer.start;
mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
mtctx->jobs[jobID].srcSize = srcSize;
mtctx->jobs[jobID].consumed = 0;
mtctx->jobs[jobID].cSize = 0;
mtctx->jobs[jobID].prefixSize = mtctx->prefixSize;
assert(mtctx->inBuff.filled >= srcSize + mtctx->prefixSize);
mtctx->jobs[jobID].params = mtctx->params;
/* do not calculate checksum within sections, but write it in header for first section */
@ -1066,7 +1069,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* mtctx, size_t srcSize, ZS
}
mtctx->inBuff.filled -= srcSize + mtctx->prefixSize - newPrefixSize;
memmove(mtctx->inBuff.buffer.start, /* copy end of current job into next job, as "prefix" */
(const char*)mtctx->jobs[jobID].srcStart + mtctx->prefixSize + srcSize - newPrefixSize,
(const char*)mtctx->jobs[jobID].prefixStart + mtctx->prefixSize + srcSize - newPrefixSize,
mtctx->inBuff.filled);
mtctx->prefixSize = newPrefixSize;
} else { /* endFrame==1 => no need for another input buffer */