Signal after finishing job when queueSize=0

This commit is contained in:
Stella Lau 2017-08-01 20:12:06 -07:00
parent 1d76da1d87
commit 73ba58955f

View File

@ -66,6 +66,7 @@ static void* POOL_thread(void* opaque) {
for (;;) { for (;;) {
/* Lock the mutex and wait for a non-empty queue or until shutdown */ /* Lock the mutex and wait for a non-empty queue or until shutdown */
pthread_mutex_lock(&ctx->queueMutex); pthread_mutex_lock(&ctx->queueMutex);
while (ctx->queueEmpty && !ctx->shutdown) { while (ctx->queueEmpty && !ctx->shutdown) {
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
} }
@ -82,12 +83,20 @@ static void* POOL_thread(void* opaque) {
ctx->queueEmpty = ctx->queueHead == ctx->queueTail; ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
/* Unlock the mutex, signal a pusher, and run the job */ /* Unlock the mutex, signal a pusher, and run the job */
pthread_mutex_unlock(&ctx->queueMutex); pthread_mutex_unlock(&ctx->queueMutex);
pthread_cond_signal(&ctx->queuePushCond);
if (ctx->queueSize > 1) {
pthread_cond_signal(&ctx->queuePushCond);
}
job.function(job.opaque); job.function(job.opaque);
pthread_mutex_lock(&ctx->queueMutex); /* If the intended queue size was 0, signal after finishing job */
ctx->numThreadsBusy--; if (ctx->queueSize == 1) {
pthread_mutex_unlock(&ctx->queueMutex); pthread_mutex_lock(&ctx->queueMutex);
ctx->numThreadsBusy--;
pthread_mutex_unlock(&ctx->queueMutex);
pthread_cond_signal(&ctx->queuePushCond);
}
} }
} }
/* Unreachable */ /* Unreachable */
@ -168,6 +177,21 @@ size_t POOL_sizeof(POOL_ctx *ctx) {
+ ctx->numThreads * sizeof(pthread_t); + ctx->numThreads * sizeof(pthread_t);
} }
/**
* Returns 1 if the queue is full and 0 otherwise.
*
* If the queueSize is 1 (the pool was created with an intended queueSize of 0),
* then a queue is empty if there is a thread free and no job is waiting.
*/
static int isQueueFull(POOL_ctx const* ctx) {
if (ctx->queueSize > 1) {
return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize);
} else {
return ctx->numThreadsBusy == ctx->numThreads ||
!ctx->queueEmpty;
}
}
void POOL_add(void* ctxVoid, POOL_function function, void *opaque) { void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
POOL_ctx* const ctx = (POOL_ctx*)ctxVoid; POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
if (!ctx) { return; } if (!ctx) { return; }
@ -175,22 +199,15 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
pthread_mutex_lock(&ctx->queueMutex); pthread_mutex_lock(&ctx->queueMutex);
{ POOL_job const job = {function, opaque}; { POOL_job const job = {function, opaque};
// Wait until there is space in the queue for the new job. /* Wait until there is space in the queue for the new job */
// If the ctx->queueSize is 1 (the pool was created with an while (isQueueFull(ctx) && !ctx->shutdown) {
// intended queueSize of 0) and there is no job already waiting,
// wait until there is a thread free for the new job.
size_t newTail = (ctx->queueTail + 1) % ctx->queueSize;
while (ctx->queueHead == newTail && !ctx->shutdown &&
(ctx->queueSize > 1 || ctx->numThreadsBusy == ctx->numThreads ||
!ctx->queueEmpty)) {
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
newTail = (ctx->queueTail + 1) % ctx->queueSize;
} }
/* The queue is still going => there is space */ /* The queue is still going => there is space */
if (!ctx->shutdown) { if (!ctx->shutdown) {
ctx->queueEmpty = 0; ctx->queueEmpty = 0;
ctx->queue[ctx->queueTail] = job; ctx->queue[ctx->queueTail] = job;
ctx->queueTail = newTail; ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
} }
} }
pthread_mutex_unlock(&ctx->queueMutex); pthread_mutex_unlock(&ctx->queueMutex);