frameProgression reports nbActiveWorkers and output flushed

This commit is contained in:
Yann Collet 2018-08-14 11:49:25 -07:00
parent 0853f86044
commit 3e4617ef54
4 changed files with 26 additions and 21 deletions

View File

@ -900,7 +900,9 @@ ZSTD_frameProgression ZSTD_getFrameProgression(const ZSTD_CCtx* cctx)
fp.ingested = cctx->consumedSrcSize + buffered;
fp.consumed = cctx->consumedSrcSize;
fp.produced = cctx->producedCSize;
fp.flushed = cctx->producedCSize; /* simplified; some data might still be left within streaming output buffer */
fp.currentJobID = 0;
fp.nbActiveWorkers = 0;
return fp;
} }

View File

@ -1058,7 +1058,7 @@ static size_t ZSTDMT_resize(ZSTDMT_CCtx* mtctx, unsigned nbWorkers)
/*! ZSTDMT_updateCParams_whileCompressing() :
* Updates only a selected set of compression parameters, to remain compatible with current frame.
* Updates a selected set of compression parameters, remaining compatible with currently active frame.
* New parameters will be applied to next compression job. */
void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_params* cctxParams)
{
@ -1076,27 +1076,31 @@ void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx* mtctx, const ZSTD_CCtx_p
/* ZSTDMT_getFrameProgression():
* tells how much data has been consumed (input) and produced (output) for current frame.
* able to count progression inside worker threads.
* Note : mutex will be acquired during statistics collection. */
* Note : mutex will be acquired during statistics collection inside workers. */
ZSTD_frameProgression ZSTDMT_getFrameProgression(ZSTDMT_CCtx* mtctx)
{
ZSTD_frameProgression fps;
DEBUGLOG(5, "ZSTDMT_getFrameProgression");
fps.ingested = mtctx->consumed + mtctx->inBuff.filled;
fps.consumed = mtctx->consumed;
fps.produced = mtctx->produced;
fps.produced = fps.flushed = mtctx->produced;
fps.currentJobID = mtctx->nextJobID;
fps.nbActiveWorkers = 0;
{ unsigned jobNb;
unsigned lastJobNb = mtctx->nextJobID + mtctx->jobReady; assert(mtctx->jobReady <= 1);
DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
mtctx->doneJobID, lastJobNb, mtctx->jobReady)
for (jobNb = mtctx->doneJobID ; jobNb < lastJobNb ; jobNb++) {
unsigned const wJobID = jobNb & mtctx->jobIDMask;
ZSTD_pthread_mutex_lock(&mtctx->jobs[wJobID].job_mutex);
{ size_t const cResult = mtctx->jobs[wJobID].cSize;
ZSTDMT_jobDescription* jobPtr = &mtctx->jobs[wJobID];
ZSTD_pthread_mutex_lock(&jobPtr->job_mutex);
{ size_t const cResult = jobPtr->cSize;
size_t const produced = ZSTD_isError(cResult) ? 0 : cResult;
fps.ingested += mtctx->jobs[wJobID].src.size;
fps.consumed += mtctx->jobs[wJobID].consumed;
fps.ingested += jobPtr->src.size;
fps.consumed += jobPtr->consumed;
fps.produced += produced;
fps.flushed += jobPtr->dstFlushed;
fps.nbActiveWorkers += (jobPtr->consumed < jobPtr->src.size);
}
ZSTD_pthread_mutex_unlock(&mtctx->jobs[wJobID].job_mutex);
}

View File

@ -732,10 +732,12 @@ ZSTDLIB_API size_t ZSTD_resetCStream(ZSTD_CStream* zcs, unsigned long long pledg
typedef struct {
unsigned long long ingested;
unsigned long long consumed;
unsigned long long produced;
unsigned currentJobID;
unsigned long long ingested; /* nb input bytes read and buffered */
unsigned long long consumed; /* nb input bytes actually compressed */
unsigned long long produced; /* nb of compressed bytes generated and buffered */
unsigned long long flushed; /* nb of compressed bytes flushed : not provided; can be tracked from caller side */
unsigned currentJobID; /* MT only : latest started job nb */
unsigned nbActiveWorkers; /* MT only : nb of workers actively compressing at probe time */
} ZSTD_frameProgression;
/* ZSTD_getFrameProgression():

View File

@ -800,16 +800,17 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
/* check output speed */
if (zfp.currentJobID > 1) {
static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0 };
static unsigned long long lastFlushedSize = 0;
static ZSTD_frameProgression cpszfp = { 0, 0, 0, 0, 0, 0 };
unsigned long long newlyProduced = zfp.produced - cpszfp.produced;
unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
unsigned long long newlyFlushed = zfp.flushed - cpszfp.flushed;
assert(zfp.produced >= cpszfp.produced);
cpszfp = zfp;
if ( (zfp.ingested == cpszfp.ingested)
&& (zfp.consumed == cpszfp.consumed) ) {
DISPLAYLEVEL(6, "no data read nor consumed : buffers are full (?) or compression is slow + input has reached its limit. If buffers full : output is too slow => slow down \n")
DISPLAYLEVEL(2, "no data read nor consumed : buffers are full (?) output is too slow => slow down ; or compression is slow + input has reached its limit => can't tell \n")
speedChange = slower;
}
@ -818,8 +819,6 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
DISPLAYLEVEL(6, "production faster than flushing (%llu > %llu) but there is still %u bytes to flush => slow down \n", newlyProduced, newlyFlushed, (U32)stillToFlush);
speedChange = slower;
}
cpszfp = zfp;
lastFlushedSize = compressedfilesize;
}
/* course correct only if there is at least one new job completed */
@ -832,14 +831,12 @@ FIO_compressZstdFrame(const cRess_t* ressPtr,
DISPLAYLEVEL(6, "input is never blocked => input is too slow \n");
speedChange = slower;
} else if (speedChange == noChange) {
static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0 };
static unsigned long long lastFlushedSize = 0;
static ZSTD_frameProgression csuzfp = { 0, 0, 0, 0, 0, 0 };
unsigned long long newlyIngested = zfp.ingested - csuzfp.ingested;
unsigned long long newlyConsumed = zfp.consumed - csuzfp.consumed;
unsigned long long newlyProduced = zfp.produced - csuzfp.produced;
unsigned long long newlyFlushed = compressedfilesize - lastFlushedSize;
unsigned long long newlyFlushed = zfp.flushed - csuzfp.flushed;
csuzfp = zfp;
lastFlushedSize = compressedfilesize;
assert(inputPresented > 0);
DISPLAYLEVEL(6, "input blocked %u/%u(%.2f) - ingested:%u vs %u:consumed - flushed:%u vs %u:produced \n",
inputBlocked, inputPresented, (double)inputBlocked/inputPresented*100,