mirror of
https://github.com/facebook/zstd.git
synced 2025-10-09 00:05:28 -04:00
protect buffer pool with a mutex
This commit is contained in:
parent
3d93f2fce7
commit
ce9e1452fd
@ -1,5 +1,5 @@
|
|||||||
#include <stdlib.h> /* malloc */
|
#include <stdlib.h> /* malloc */
|
||||||
#include <pthread.h>
|
#include <pthread.h> /* posix only, to be replaced by a more portable version */
|
||||||
#include "zstd_internal.h" /* MIN, ERROR */
|
#include "zstd_internal.h" /* MIN, ERROR */
|
||||||
#include "zstdmt_compress.h"
|
#include "zstdmt_compress.h"
|
||||||
|
|
||||||
@ -39,7 +39,7 @@ static ZSTDMT_dstBufferManager ZSTDMT_createDstBufferManager(void* dst, size_t d
|
|||||||
dbm.frameIDToWrite = 0;
|
dbm.frameIDToWrite = 0;
|
||||||
pthread_mutex_init(&dbm.frameTable_mutex, NULL);
|
pthread_mutex_init(&dbm.frameTable_mutex, NULL);
|
||||||
pthread_mutex_init(&dbm.allFramesWritten_mutex, NULL);
|
pthread_mutex_init(&dbm.allFramesWritten_mutex, NULL);
|
||||||
pthread_mutex_lock(&dbm.allFramesWritten_mutex);
|
pthread_mutex_lock(&dbm.allFramesWritten_mutex); /* maybe could be merged into init ? */
|
||||||
dbm.nbStackedFrames = 0;
|
dbm.nbStackedFrames = 0;
|
||||||
return dbm;
|
return dbm;
|
||||||
}
|
}
|
||||||
@ -92,7 +92,7 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
|
|||||||
pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
|
pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
|
||||||
if (frameID != dstBufferManager->frameIDToWrite) {
|
if (frameID != dstBufferManager->frameIDToWrite) {
|
||||||
DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u ", frameID, dstBufferManager->frameIDToWrite);
|
DEBUGLOG(4, "writing frameID %u : not possible, waiting for %u ", frameID, dstBufferManager->frameIDToWrite);
|
||||||
frameToWrite_t frame = { src, srcSize, frameID, isLastFrame };
|
frameToWrite_t const frame = { src, srcSize, frameID, isLastFrame };
|
||||||
ZSTDMT_stackFrameToWrite(dstBufferManager, frame);
|
ZSTDMT_stackFrameToWrite(dstBufferManager, frame);
|
||||||
pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
|
pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
|
||||||
return 0;
|
return 0;
|
||||||
@ -121,9 +121,11 @@ static size_t ZSTDMT_tryWriteFrame(ZSTDMT_dstBufferManager* dstBufferManager,
|
|||||||
for (u=0; u<dstBufferManager->nbStackedFrames; u++) {
|
for (u=0; u<dstBufferManager->nbStackedFrames; u++) {
|
||||||
if (dstBufferManager->stackedFrame[u].frameID == frameID) {
|
if (dstBufferManager->stackedFrame[u].frameID == frameID) {
|
||||||
pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
|
pthread_mutex_unlock(&dstBufferManager->frameTable_mutex);
|
||||||
|
DEBUGLOG(4, "catch up frame %u ", frameID);
|
||||||
{ size_t const writeError = ZSTDMT_writeFrame(dstBufferManager, u);
|
{ size_t const writeError = ZSTDMT_writeFrame(dstBufferManager, u);
|
||||||
if (ZSTD_isError(writeError)) return writeError; }
|
if (ZSTD_isError(writeError)) return writeError; }
|
||||||
lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame;
|
lastFrameWritten = dstBufferManager->stackedFrame[u].isLastFrame;
|
||||||
|
dstBufferManager->frameIDToWrite = frameID+1;
|
||||||
/* remove frame from stack */
|
/* remove frame from stack */
|
||||||
pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
|
pthread_mutex_lock(&dstBufferManager->frameTable_mutex);
|
||||||
dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1];
|
dstBufferManager->stackedFrame[u] = dstBufferManager->stackedFrame[dstBufferManager->nbStackedFrames-1];
|
||||||
@ -183,20 +185,24 @@ static ZSTDMT_jobDescription ZSTDMT_getjob(ZSTDMT_jobAgency* jobAgency)
|
|||||||
|
|
||||||
#define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX
|
#define ZSTDMT_NBBUFFERSPOOLED_MAX ZSTDMT_NBTHREADS_MAX
|
||||||
typedef struct ZSTDMT_bufferPool_s {
|
typedef struct ZSTDMT_bufferPool_s {
|
||||||
|
pthread_mutex_t bufferPool_mutex;
|
||||||
buffer_t bTable[ZSTDMT_NBBUFFERSPOOLED_MAX];
|
buffer_t bTable[ZSTDMT_NBBUFFERSPOOLED_MAX];
|
||||||
unsigned nbBuffers;
|
unsigned nbBuffers;
|
||||||
} ZSTDMT_bufferPool;
|
} ZSTDMT_bufferPool;
|
||||||
|
|
||||||
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
|
static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
|
||||||
{
|
{
|
||||||
|
pthread_mutex_lock(&pool->bufferPool_mutex);
|
||||||
if (pool->nbBuffers) { /* try to use an existing buffer */
|
if (pool->nbBuffers) { /* try to use an existing buffer */
|
||||||
pool->nbBuffers--;
|
pool->nbBuffers--;
|
||||||
buffer_t const buf = pool->bTable[pool->nbBuffers];
|
buffer_t const buf = pool->bTable[pool->nbBuffers];
|
||||||
|
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
||||||
size_t const availBufferSize = buf.bufferSize;
|
size_t const availBufferSize = buf.bufferSize;
|
||||||
if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */
|
if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) /* large enough, but not too much */
|
||||||
return buf;
|
return buf;
|
||||||
free(buf.start); /* size conditions not respected : create a new buffer */
|
free(buf.start); /* size conditions not respected : create a new buffer */
|
||||||
}
|
}
|
||||||
|
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
||||||
/* create new buffer */
|
/* create new buffer */
|
||||||
buffer_t buf;
|
buffer_t buf;
|
||||||
buf.bufferSize = bSize;
|
buf.bufferSize = bSize;
|
||||||
@ -207,11 +213,14 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* pool, size_t bSize)
|
|||||||
/* effectively store buffer for later re-use, up to pool capacity */
|
/* effectively store buffer for later re-use, up to pool capacity */
|
||||||
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
|
static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* pool, buffer_t buf)
|
||||||
{
|
{
|
||||||
|
pthread_mutex_lock(&pool->bufferPool_mutex);
|
||||||
if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) {
|
if (pool->nbBuffers >= ZSTDMT_NBBUFFERSPOOLED_MAX) {
|
||||||
|
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
||||||
free(buf.start);
|
free(buf.start);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */
|
pool->bTable[pool->nbBuffers++] = buf; /* store for later re-use */
|
||||||
|
pthread_mutex_unlock(&pool->bufferPool_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -253,9 +262,12 @@ ZSTDMT_CCtx *ZSTDMT_createCCtx(unsigned nbThreads)
|
|||||||
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
|
if ((nbThreads < 1) | (nbThreads > ZSTDMT_NBTHREADS_MAX)) return NULL;
|
||||||
ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx));
|
ZSTDMT_CCtx* const cctx = (ZSTDMT_CCtx*) calloc(1, sizeof(ZSTDMT_CCtx));
|
||||||
if (!cctx) return NULL;
|
if (!cctx) return NULL;
|
||||||
|
/* init jobAgency */
|
||||||
pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL); /* check return value ? */
|
pthread_mutex_init(&cctx->jobAgency.jobAnnounce_mutex, NULL); /* check return value ? */
|
||||||
pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL);
|
pthread_mutex_init(&cctx->jobAgency.jobApply_mutex, NULL);
|
||||||
pthread_mutex_lock(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */
|
pthread_mutex_lock(&cctx->jobAgency.jobAnnounce_mutex); /* no job at beginning */
|
||||||
|
/* init bufferPool */
|
||||||
|
pthread_mutex_init(&cctx->bufferPool.bufferPool_mutex, NULL);
|
||||||
/* start all workers */
|
/* start all workers */
|
||||||
cctx->nbThreads = nbThreads;
|
cctx->nbThreads = nbThreads;
|
||||||
DEBUGLOG(2, "nbThreads : %u \n", nbThreads);
|
DEBUGLOG(2, "nbThreads : %u \n", nbThreads);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user