finalized POOL_resize()

POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads)

The function may fail, and returns a NULL pointer in this case.
This commit is contained in:
Yann Collet 2018-06-19 16:03:12 -07:00
parent 1c714fda3f
commit 4567c57199
2 changed files with 37 additions and 23 deletions

View File

@ -185,18 +185,21 @@ size_t POOL_sizeof(POOL_ctx *ctx) {
}
/* note : only works if no job is running !
* return : 1 on success, 0 on failure */
static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
/* note : only works if no job is running ! */
static POOL_ctx* POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
{
if (ctx->numThreadsBusy > 0) return 0;
if (ctx->numThreadsBusy > 0) return NULL;
if (numThreads <= ctx->threadCapacity) {
ctx->threadLimit = numThreads;
return 1;
return ctx;
}
/* numThreads > threadCapacity */
{ ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem);
if (!threadPool) return 0;
if (!threadPool) return NULL;
/* replace existing thread pool */
memcpy(threadPool, ctx->threads, ctx->threadCapacity);
ZSTD_free(ctx->threads, ctx->customMem);
ctx->threads = threadPool;
/* Initialize additional threads */
{ size_t threadId;
for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) {
@ -204,30 +207,26 @@ static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads)
break;
} }
if (threadId != numThreads) { /* not all threads successfully init */
/* how to destroy existing threads ? */
/* POOL_join destroy all existing threads, not just newly created ones */
return 0;
}
}
/* replace existing thread pool */
memcpy(threadPool, ctx->threads, ctx->threadCapacity);
ZSTD_free(ctx->threads, ctx->customMem);
ctx->threads = threadPool;
}
ctx->threadCapacity = threadId;
return NULL; /* will release the pool */
} } }
/* successfully expanded */
ctx->threadCapacity = numThreads;
ctx->threadLimit = numThreads;
return 1;
return ctx;
}
/* return : 1 on success, 0 on failure */
int POOL_resize(POOL_ctx* ctx, size_t numThreads)
/* @return : a working pool on success, NULL on failure
* note : starting context is considered consumed. */
POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads)
{
int result;
if (!ctx) return 0;
POOL_ctx* newCtx;
if (ctx==NULL) return NULL;
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
result = POOL_resize_internal(ctx, numThreads);
newCtx = POOL_resize_internal(ctx, numThreads);
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
return result;
if (newCtx!=ctx) POOL_free(ctx);
return newCtx;
}
/**
@ -314,6 +313,11 @@ void POOL_free(POOL_ctx* ctx) {
(void)ctx;
}
POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads) {
(void)numThreads;
return ctx;
}
void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) {
(void)ctx;
function(opaque);

View File

@ -38,6 +38,16 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize,
*/
void POOL_free(POOL_ctx* ctx);
/*! POOL_resize() :
* Expands or shrinks pool's number of threads.
* This is more efficient than releasing and creating a new context.
* @return : a new pool context on success, NULL on failure
* note : new pool context might have same address as original one, but it's not guaranteed.
* consider starting context as consumed, only rely on returned one.
* note 2 : only numThreads can be resized, queueSize is unchanged.
*/
POOL_ctx* POOL_resize(POOL_ctx* ctx, size_t numThreads);
/*! POOL_sizeof() :
* @return threadpool memory usage
* note : compatible with NULL (returns 0 in this case)