mirror of
https://github.com/facebook/zstd.git
synced 2025-12-05 00:03:19 -05:00
Merge pull request #869 from terrelln/dev
[libzstd] pthread function prefixed with ZSTD_
This commit is contained in:
commit
e45a2aea9b
@ -33,7 +33,7 @@ typedef struct POOL_job_s {
|
|||||||
struct POOL_ctx_s {
|
struct POOL_ctx_s {
|
||||||
ZSTD_customMem customMem;
|
ZSTD_customMem customMem;
|
||||||
/* Keep track of the threads */
|
/* Keep track of the threads */
|
||||||
pthread_t *threads;
|
ZSTD_pthread_t *threads;
|
||||||
size_t numThreads;
|
size_t numThreads;
|
||||||
|
|
||||||
/* The queue is a circular buffer */
|
/* The queue is a circular buffer */
|
||||||
@ -48,11 +48,11 @@ struct POOL_ctx_s {
|
|||||||
int queueEmpty;
|
int queueEmpty;
|
||||||
|
|
||||||
/* The mutex protects the queue */
|
/* The mutex protects the queue */
|
||||||
pthread_mutex_t queueMutex;
|
ZSTD_pthread_mutex_t queueMutex;
|
||||||
/* Condition variable for pushers to wait on when the queue is full */
|
/* Condition variable for pushers to wait on when the queue is full */
|
||||||
pthread_cond_t queuePushCond;
|
ZSTD_pthread_cond_t queuePushCond;
|
||||||
/* Condition variables for poppers to wait on when the queue is empty */
|
/* Condition variables for poppers to wait on when the queue is empty */
|
||||||
pthread_cond_t queuePopCond;
|
ZSTD_pthread_cond_t queuePopCond;
|
||||||
/* Indicates if the queue is shutting down */
|
/* Indicates if the queue is shutting down */
|
||||||
int shutdown;
|
int shutdown;
|
||||||
};
|
};
|
||||||
@ -67,14 +67,14 @@ static void* POOL_thread(void* opaque) {
|
|||||||
if (!ctx) { return NULL; }
|
if (!ctx) { return NULL; }
|
||||||
for (;;) {
|
for (;;) {
|
||||||
/* Lock the mutex and wait for a non-empty queue or until shutdown */
|
/* Lock the mutex and wait for a non-empty queue or until shutdown */
|
||||||
pthread_mutex_lock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
|
||||||
|
|
||||||
while (ctx->queueEmpty && !ctx->shutdown) {
|
while (ctx->queueEmpty && !ctx->shutdown) {
|
||||||
pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
|
ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex);
|
||||||
}
|
}
|
||||||
/* empty => shutting down: so stop */
|
/* empty => shutting down: so stop */
|
||||||
if (ctx->queueEmpty) {
|
if (ctx->queueEmpty) {
|
||||||
pthread_mutex_unlock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
|
||||||
return opaque;
|
return opaque;
|
||||||
}
|
}
|
||||||
/* Pop a job off the queue */
|
/* Pop a job off the queue */
|
||||||
@ -83,17 +83,17 @@ static void* POOL_thread(void* opaque) {
|
|||||||
ctx->numThreadsBusy++;
|
ctx->numThreadsBusy++;
|
||||||
ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
|
ctx->queueEmpty = ctx->queueHead == ctx->queueTail;
|
||||||
/* Unlock the mutex, signal a pusher, and run the job */
|
/* Unlock the mutex, signal a pusher, and run the job */
|
||||||
pthread_mutex_unlock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
|
||||||
pthread_cond_signal(&ctx->queuePushCond);
|
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
|
||||||
|
|
||||||
job.function(job.opaque);
|
job.function(job.opaque);
|
||||||
|
|
||||||
/* If the intended queue size was 0, signal after finishing job */
|
/* If the intended queue size was 0, signal after finishing job */
|
||||||
if (ctx->queueSize == 1) {
|
if (ctx->queueSize == 1) {
|
||||||
pthread_mutex_lock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
|
||||||
ctx->numThreadsBusy--;
|
ctx->numThreadsBusy--;
|
||||||
pthread_mutex_unlock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
|
||||||
pthread_cond_signal(&ctx->queuePushCond);
|
ZSTD_pthread_cond_signal(&ctx->queuePushCond);
|
||||||
} }
|
} }
|
||||||
} /* for (;;) */
|
} /* for (;;) */
|
||||||
/* Unreachable */
|
/* Unreachable */
|
||||||
@ -120,12 +120,12 @@ POOL_ctx *POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customM
|
|||||||
ctx->queueTail = 0;
|
ctx->queueTail = 0;
|
||||||
ctx->numThreadsBusy = 0;
|
ctx->numThreadsBusy = 0;
|
||||||
ctx->queueEmpty = 1;
|
ctx->queueEmpty = 1;
|
||||||
(void)pthread_mutex_init(&ctx->queueMutex, NULL);
|
(void)ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL);
|
||||||
(void)pthread_cond_init(&ctx->queuePushCond, NULL);
|
(void)ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL);
|
||||||
(void)pthread_cond_init(&ctx->queuePopCond, NULL);
|
(void)ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL);
|
||||||
ctx->shutdown = 0;
|
ctx->shutdown = 0;
|
||||||
/* Allocate space for the thread handles */
|
/* Allocate space for the thread handles */
|
||||||
ctx->threads = (pthread_t*)ZSTD_malloc(numThreads * sizeof(pthread_t), customMem);
|
ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem);
|
||||||
ctx->numThreads = 0;
|
ctx->numThreads = 0;
|
||||||
ctx->customMem = customMem;
|
ctx->customMem = customMem;
|
||||||
/* Check for errors */
|
/* Check for errors */
|
||||||
@ -133,7 +133,7 @@ POOL_ctx *POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customM
|
|||||||
/* Initialize the threads */
|
/* Initialize the threads */
|
||||||
{ size_t i;
|
{ size_t i;
|
||||||
for (i = 0; i < numThreads; ++i) {
|
for (i = 0; i < numThreads; ++i) {
|
||||||
if (pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
|
if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) {
|
||||||
ctx->numThreads = i;
|
ctx->numThreads = i;
|
||||||
POOL_free(ctx);
|
POOL_free(ctx);
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -148,25 +148,25 @@ POOL_ctx *POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customM
|
|||||||
*/
|
*/
|
||||||
static void POOL_join(POOL_ctx *ctx) {
|
static void POOL_join(POOL_ctx *ctx) {
|
||||||
/* Shut down the queue */
|
/* Shut down the queue */
|
||||||
pthread_mutex_lock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
|
||||||
ctx->shutdown = 1;
|
ctx->shutdown = 1;
|
||||||
pthread_mutex_unlock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
|
||||||
/* Wake up sleeping threads */
|
/* Wake up sleeping threads */
|
||||||
pthread_cond_broadcast(&ctx->queuePushCond);
|
ZSTD_pthread_cond_broadcast(&ctx->queuePushCond);
|
||||||
pthread_cond_broadcast(&ctx->queuePopCond);
|
ZSTD_pthread_cond_broadcast(&ctx->queuePopCond);
|
||||||
/* Join all of the threads */
|
/* Join all of the threads */
|
||||||
{ size_t i;
|
{ size_t i;
|
||||||
for (i = 0; i < ctx->numThreads; ++i) {
|
for (i = 0; i < ctx->numThreads; ++i) {
|
||||||
pthread_join(ctx->threads[i], NULL);
|
ZSTD_pthread_join(ctx->threads[i], NULL);
|
||||||
} }
|
} }
|
||||||
}
|
}
|
||||||
|
|
||||||
void POOL_free(POOL_ctx *ctx) {
|
void POOL_free(POOL_ctx *ctx) {
|
||||||
if (!ctx) { return; }
|
if (!ctx) { return; }
|
||||||
POOL_join(ctx);
|
POOL_join(ctx);
|
||||||
pthread_mutex_destroy(&ctx->queueMutex);
|
ZSTD_pthread_mutex_destroy(&ctx->queueMutex);
|
||||||
pthread_cond_destroy(&ctx->queuePushCond);
|
ZSTD_pthread_cond_destroy(&ctx->queuePushCond);
|
||||||
pthread_cond_destroy(&ctx->queuePopCond);
|
ZSTD_pthread_cond_destroy(&ctx->queuePopCond);
|
||||||
ZSTD_free(ctx->queue, ctx->customMem);
|
ZSTD_free(ctx->queue, ctx->customMem);
|
||||||
ZSTD_free(ctx->threads, ctx->customMem);
|
ZSTD_free(ctx->threads, ctx->customMem);
|
||||||
ZSTD_free(ctx, ctx->customMem);
|
ZSTD_free(ctx, ctx->customMem);
|
||||||
@ -176,7 +176,7 @@ size_t POOL_sizeof(POOL_ctx *ctx) {
|
|||||||
if (ctx==NULL) return 0; /* supports sizeof NULL */
|
if (ctx==NULL) return 0; /* supports sizeof NULL */
|
||||||
return sizeof(*ctx)
|
return sizeof(*ctx)
|
||||||
+ ctx->queueSize * sizeof(POOL_job)
|
+ ctx->queueSize * sizeof(POOL_job)
|
||||||
+ ctx->numThreads * sizeof(pthread_t);
|
+ ctx->numThreads * sizeof(ZSTD_pthread_t);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -198,12 +198,12 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
|
|||||||
POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
|
POOL_ctx* const ctx = (POOL_ctx*)ctxVoid;
|
||||||
if (!ctx) { return; }
|
if (!ctx) { return; }
|
||||||
|
|
||||||
pthread_mutex_lock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_lock(&ctx->queueMutex);
|
||||||
{ POOL_job const job = {function, opaque};
|
{ POOL_job const job = {function, opaque};
|
||||||
|
|
||||||
/* Wait until there is space in the queue for the new job */
|
/* Wait until there is space in the queue for the new job */
|
||||||
while (isQueueFull(ctx) && !ctx->shutdown) {
|
while (isQueueFull(ctx) && !ctx->shutdown) {
|
||||||
pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
|
ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex);
|
||||||
}
|
}
|
||||||
/* The queue is still going => there is space */
|
/* The queue is still going => there is space */
|
||||||
if (!ctx->shutdown) {
|
if (!ctx->shutdown) {
|
||||||
@ -212,8 +212,8 @@ void POOL_add(void* ctxVoid, POOL_function function, void *opaque) {
|
|||||||
ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
|
ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&ctx->queueMutex);
|
ZSTD_pthread_mutex_unlock(&ctx->queueMutex);
|
||||||
pthread_cond_signal(&ctx->queuePopCond);
|
ZSTD_pthread_cond_signal(&ctx->queuePopCond);
|
||||||
}
|
}
|
||||||
|
|
||||||
#else /* ZSTD_MULTITHREAD not defined */
|
#else /* ZSTD_MULTITHREAD not defined */
|
||||||
|
|||||||
@ -35,12 +35,12 @@ int g_ZSTD_threading_useles_symbol;
|
|||||||
|
|
||||||
static unsigned __stdcall worker(void *arg)
|
static unsigned __stdcall worker(void *arg)
|
||||||
{
|
{
|
||||||
pthread_t* const thread = (pthread_t*) arg;
|
ZSTD_pthread_t* const thread = (ZSTD_pthread_t*) arg;
|
||||||
thread->arg = thread->start_routine(thread->arg);
|
thread->arg = thread->start_routine(thread->arg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int pthread_create(pthread_t* thread, const void* unused,
|
int ZSTD_pthread_create(ZSTD_pthread_t* thread, const void* unused,
|
||||||
void* (*start_routine) (void*), void* arg)
|
void* (*start_routine) (void*), void* arg)
|
||||||
{
|
{
|
||||||
(void)unused;
|
(void)unused;
|
||||||
@ -54,16 +54,16 @@ int pthread_create(pthread_t* thread, const void* unused,
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int _pthread_join(pthread_t * thread, void **value_ptr)
|
int ZSTD_pthread_join(ZSTD_pthread_t thread, void **value_ptr)
|
||||||
{
|
{
|
||||||
DWORD result;
|
DWORD result;
|
||||||
|
|
||||||
if (!thread->handle) return 0;
|
if (!thread.handle) return 0;
|
||||||
|
|
||||||
result = WaitForSingleObject(thread->handle, INFINITE);
|
result = WaitForSingleObject(thread.handle, INFINITE);
|
||||||
switch (result) {
|
switch (result) {
|
||||||
case WAIT_OBJECT_0:
|
case WAIT_OBJECT_0:
|
||||||
if (value_ptr) *value_ptr = thread->arg;
|
if (value_ptr) *value_ptr = thread.arg;
|
||||||
return 0;
|
return 0;
|
||||||
case WAIT_ABANDONED:
|
case WAIT_ABANDONED:
|
||||||
return EINVAL;
|
return EINVAL;
|
||||||
|
|||||||
@ -44,32 +44,31 @@ extern "C" {
|
|||||||
|
|
||||||
|
|
||||||
/* mutex */
|
/* mutex */
|
||||||
#define pthread_mutex_t CRITICAL_SECTION
|
#define ZSTD_pthread_mutex_t CRITICAL_SECTION
|
||||||
#define pthread_mutex_init(a,b) (InitializeCriticalSection((a)), 0)
|
#define ZSTD_pthread_mutex_init(a, b) (InitializeCriticalSection((a)), 0)
|
||||||
#define pthread_mutex_destroy(a) DeleteCriticalSection((a))
|
#define ZSTD_pthread_mutex_destroy(a) DeleteCriticalSection((a))
|
||||||
#define pthread_mutex_lock(a) EnterCriticalSection((a))
|
#define ZSTD_pthread_mutex_lock(a) EnterCriticalSection((a))
|
||||||
#define pthread_mutex_unlock(a) LeaveCriticalSection((a))
|
#define ZSTD_pthread_mutex_unlock(a) LeaveCriticalSection((a))
|
||||||
|
|
||||||
/* condition variable */
|
/* condition variable */
|
||||||
#define pthread_cond_t CONDITION_VARIABLE
|
#define ZSTD_pthread_cond_t CONDITION_VARIABLE
|
||||||
#define pthread_cond_init(a, b) (InitializeConditionVariable((a)), 0)
|
#define ZSTD_pthread_cond_init(a, b) (InitializeConditionVariable((a)), 0)
|
||||||
#define pthread_cond_destroy(a) /* No delete */
|
#define ZSTD_pthread_cond_destroy(a) /* No delete */
|
||||||
#define pthread_cond_wait(a, b) SleepConditionVariableCS((a), (b), INFINITE)
|
#define ZSTD_pthread_cond_wait(a, b) SleepConditionVariableCS((a), (b), INFINITE)
|
||||||
#define pthread_cond_signal(a) WakeConditionVariable((a))
|
#define ZSTD_pthread_cond_signal(a) WakeConditionVariable((a))
|
||||||
#define pthread_cond_broadcast(a) WakeAllConditionVariable((a))
|
#define ZSTD_pthread_cond_broadcast(a) WakeAllConditionVariable((a))
|
||||||
|
|
||||||
/* pthread_create() and pthread_join() */
|
/* ZSTD_pthread_create() and ZSTD_pthread_join() */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
HANDLE handle;
|
HANDLE handle;
|
||||||
void* (*start_routine)(void*);
|
void* (*start_routine)(void*);
|
||||||
void* arg;
|
void* arg;
|
||||||
} pthread_t;
|
} ZSTD_pthread_t;
|
||||||
|
|
||||||
int pthread_create(pthread_t* thread, const void* unused,
|
int ZSTD_pthread_create(ZSTD_pthread_t* thread, const void* unused,
|
||||||
void* (*start_routine) (void*), void* arg);
|
void* (*start_routine) (void*), void* arg);
|
||||||
|
|
||||||
#define pthread_join(a, b) _pthread_join(&(a), (b))
|
int ZSTD_pthread_join(ZSTD_pthread_t thread, void** value_ptr);
|
||||||
int _pthread_join(pthread_t* thread, void** value_ptr);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* add here more wrappers as required
|
* add here more wrappers as required
|
||||||
@ -80,23 +79,40 @@ int _pthread_join(pthread_t* thread, void** value_ptr);
|
|||||||
/* === POSIX Systems === */
|
/* === POSIX Systems === */
|
||||||
# include <pthread.h>
|
# include <pthread.h>
|
||||||
|
|
||||||
|
#define ZSTD_pthread_mutex_t pthread_mutex_t
|
||||||
|
#define ZSTD_pthread_mutex_init(a, b) pthread_mutex_init((a), (b))
|
||||||
|
#define ZSTD_pthread_mutex_destroy(a) pthread_mutex_destroy((a))
|
||||||
|
#define ZSTD_pthread_mutex_lock(a) pthread_mutex_lock((a))
|
||||||
|
#define ZSTD_pthread_mutex_unlock(a) pthread_mutex_unlock((a))
|
||||||
|
|
||||||
|
#define ZSTD_pthread_cond_t pthread_cond_t
|
||||||
|
#define ZSTD_pthread_cond_init(a, b) pthread_cond_init((a), (b))
|
||||||
|
#define ZSTD_pthread_cond_destroy(a) pthread_cond_destroy((a))
|
||||||
|
#define ZSTD_pthread_cond_wait(a, b) pthread_cond_wait((a), (b))
|
||||||
|
#define ZSTD_pthread_cond_signal(a) pthread_cond_signal((a))
|
||||||
|
#define ZSTD_pthread_cond_broadcast(a) pthread_cond_broadcast((a))
|
||||||
|
|
||||||
|
#define ZSTD_pthread_t pthread_t
|
||||||
|
#define ZSTD_pthread_create(a, b, c, d) pthread_create((a), (b), (c), (d))
|
||||||
|
#define ZSTD_pthread_join(a, b) pthread_join((a),(b))
|
||||||
|
|
||||||
#else /* ZSTD_MULTITHREAD not defined */
|
#else /* ZSTD_MULTITHREAD not defined */
|
||||||
/* No multithreading support */
|
/* No multithreading support */
|
||||||
|
|
||||||
#define pthread_mutex_t int /* #define rather than typedef, because sometimes pthread support is implicit, resulting in duplicated symbols */
|
typedef int ZSTD_pthread_mutex_t;
|
||||||
#define pthread_mutex_init(a,b) ((void)a, 0)
|
#define ZSTD_pthread_mutex_init(a, b) ((void)a, 0)
|
||||||
#define pthread_mutex_destroy(a)
|
#define ZSTD_pthread_mutex_destroy(a)
|
||||||
#define pthread_mutex_lock(a)
|
#define ZSTD_pthread_mutex_lock(a)
|
||||||
#define pthread_mutex_unlock(a)
|
#define ZSTD_pthread_mutex_unlock(a)
|
||||||
|
|
||||||
#define pthread_cond_t int
|
typedef int ZSTD_pthread_cond_t;
|
||||||
#define pthread_cond_init(a,b) ((void)a, 0)
|
#define ZSTD_pthread_cond_init(a, b) ((void)a, 0)
|
||||||
#define pthread_cond_destroy(a)
|
#define ZSTD_pthread_cond_destroy(a)
|
||||||
#define pthread_cond_wait(a,b)
|
#define ZSTD_pthread_cond_wait(a, b)
|
||||||
#define pthread_cond_signal(a)
|
#define ZSTD_pthread_cond_signal(a)
|
||||||
#define pthread_cond_broadcast(a)
|
#define ZSTD_pthread_cond_broadcast(a)
|
||||||
|
|
||||||
/* do not use pthread_t */
|
/* do not use ZSTD_pthread_t */
|
||||||
|
|
||||||
#endif /* ZSTD_MULTITHREAD */
|
#endif /* ZSTD_MULTITHREAD */
|
||||||
|
|
||||||
|
|||||||
@ -53,22 +53,22 @@ static unsigned long long GetCurrentClockTimeMicroseconds(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#define MUTEX_WAIT_TIME_DLEVEL 6
|
#define MUTEX_WAIT_TIME_DLEVEL 6
|
||||||
#define PTHREAD_MUTEX_LOCK(mutex) { \
|
#define ZSTD_PTHREAD_MUTEX_LOCK(mutex) { \
|
||||||
if (ZSTD_DEBUG>=MUTEX_WAIT_TIME_DLEVEL) { \
|
if (ZSTD_DEBUG>=MUTEX_WAIT_TIME_DLEVEL) { \
|
||||||
unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
|
unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
|
||||||
pthread_mutex_lock(mutex); \
|
ZSTD_pthread_mutex_lock(mutex); \
|
||||||
{ unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
|
{ unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
|
||||||
unsigned long long const elapsedTime = (afterTime-beforeTime); \
|
unsigned long long const elapsedTime = (afterTime-beforeTime); \
|
||||||
if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
|
if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
|
||||||
DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \
|
DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \
|
||||||
elapsedTime, #mutex); \
|
elapsedTime, #mutex); \
|
||||||
} } \
|
} } \
|
||||||
} else pthread_mutex_lock(mutex); \
|
} else ZSTD_pthread_mutex_lock(mutex); \
|
||||||
}
|
}
|
||||||
|
|
||||||
#else
|
#else
|
||||||
|
|
||||||
# define PTHREAD_MUTEX_LOCK(m) pthread_mutex_lock(m)
|
# define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m)
|
||||||
# define DEBUG_PRINTHEX(l,p,n) {}
|
# define DEBUG_PRINTHEX(l,p,n) {}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
@ -85,7 +85,7 @@ typedef struct buffer_s {
|
|||||||
static const buffer_t g_nullBuffer = { NULL, 0 };
|
static const buffer_t g_nullBuffer = { NULL, 0 };
|
||||||
|
|
||||||
typedef struct ZSTDMT_bufferPool_s {
|
typedef struct ZSTDMT_bufferPool_s {
|
||||||
pthread_mutex_t poolMutex;
|
ZSTD_pthread_mutex_t poolMutex;
|
||||||
size_t bufferSize;
|
size_t bufferSize;
|
||||||
unsigned totalBuffers;
|
unsigned totalBuffers;
|
||||||
unsigned nbBuffers;
|
unsigned nbBuffers;
|
||||||
@ -99,7 +99,7 @@ static ZSTDMT_bufferPool* ZSTDMT_createBufferPool(unsigned nbThreads, ZSTD_custo
|
|||||||
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc(
|
ZSTDMT_bufferPool* const bufPool = (ZSTDMT_bufferPool*)ZSTD_calloc(
|
||||||
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
|
sizeof(ZSTDMT_bufferPool) + (maxNbBuffers-1) * sizeof(buffer_t), cMem);
|
||||||
if (bufPool==NULL) return NULL;
|
if (bufPool==NULL) return NULL;
|
||||||
if (pthread_mutex_init(&bufPool->poolMutex, NULL)) {
|
if (ZSTD_pthread_mutex_init(&bufPool->poolMutex, NULL)) {
|
||||||
ZSTD_free(bufPool, cMem);
|
ZSTD_free(bufPool, cMem);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -116,7 +116,7 @@ static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
|
|||||||
if (!bufPool) return; /* compatibility with free on NULL */
|
if (!bufPool) return; /* compatibility with free on NULL */
|
||||||
for (u=0; u<bufPool->totalBuffers; u++)
|
for (u=0; u<bufPool->totalBuffers; u++)
|
||||||
ZSTD_free(bufPool->bTable[u].start, bufPool->cMem);
|
ZSTD_free(bufPool->bTable[u].start, bufPool->cMem);
|
||||||
pthread_mutex_destroy(&bufPool->poolMutex);
|
ZSTD_pthread_mutex_destroy(&bufPool->poolMutex);
|
||||||
ZSTD_free(bufPool, bufPool->cMem);
|
ZSTD_free(bufPool, bufPool->cMem);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,10 +127,10 @@ static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
|
|||||||
+ (bufPool->totalBuffers - 1) * sizeof(buffer_t);
|
+ (bufPool->totalBuffers - 1) * sizeof(buffer_t);
|
||||||
unsigned u;
|
unsigned u;
|
||||||
size_t totalBufferSize = 0;
|
size_t totalBufferSize = 0;
|
||||||
pthread_mutex_lock(&bufPool->poolMutex);
|
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
|
||||||
for (u=0; u<bufPool->totalBuffers; u++)
|
for (u=0; u<bufPool->totalBuffers; u++)
|
||||||
totalBufferSize += bufPool->bTable[u].size;
|
totalBufferSize += bufPool->bTable[u].size;
|
||||||
pthread_mutex_unlock(&bufPool->poolMutex);
|
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
|
||||||
|
|
||||||
return poolSize + totalBufferSize;
|
return poolSize + totalBufferSize;
|
||||||
}
|
}
|
||||||
@ -146,20 +146,20 @@ static buffer_t ZSTDMT_getBuffer(ZSTDMT_bufferPool* bufPool)
|
|||||||
{
|
{
|
||||||
size_t const bSize = bufPool->bufferSize;
|
size_t const bSize = bufPool->bufferSize;
|
||||||
DEBUGLOG(5, "ZSTDMT_getBuffer");
|
DEBUGLOG(5, "ZSTDMT_getBuffer");
|
||||||
pthread_mutex_lock(&bufPool->poolMutex);
|
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
|
||||||
if (bufPool->nbBuffers) { /* try to use an existing buffer */
|
if (bufPool->nbBuffers) { /* try to use an existing buffer */
|
||||||
buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)];
|
buffer_t const buf = bufPool->bTable[--(bufPool->nbBuffers)];
|
||||||
size_t const availBufferSize = buf.size;
|
size_t const availBufferSize = buf.size;
|
||||||
if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) {
|
if ((availBufferSize >= bSize) & (availBufferSize <= 10*bSize)) {
|
||||||
/* large enough, but not too much */
|
/* large enough, but not too much */
|
||||||
pthread_mutex_unlock(&bufPool->poolMutex);
|
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
/* size conditions not respected : scratch this buffer, create new one */
|
/* size conditions not respected : scratch this buffer, create new one */
|
||||||
DEBUGLOG(5, "existing buffer does not meet size conditions => freeing");
|
DEBUGLOG(5, "existing buffer does not meet size conditions => freeing");
|
||||||
ZSTD_free(buf.start, bufPool->cMem);
|
ZSTD_free(buf.start, bufPool->cMem);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&bufPool->poolMutex);
|
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
|
||||||
/* create new buffer */
|
/* create new buffer */
|
||||||
DEBUGLOG(5, "create a new buffer");
|
DEBUGLOG(5, "create a new buffer");
|
||||||
{ buffer_t buffer;
|
{ buffer_t buffer;
|
||||||
@ -175,13 +175,13 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
|
|||||||
{
|
{
|
||||||
if (buf.start == NULL) return; /* compatible with release on NULL */
|
if (buf.start == NULL) return; /* compatible with release on NULL */
|
||||||
DEBUGLOG(5, "ZSTDMT_releaseBuffer");
|
DEBUGLOG(5, "ZSTDMT_releaseBuffer");
|
||||||
pthread_mutex_lock(&bufPool->poolMutex);
|
ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
|
||||||
if (bufPool->nbBuffers < bufPool->totalBuffers) {
|
if (bufPool->nbBuffers < bufPool->totalBuffers) {
|
||||||
bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */
|
bufPool->bTable[bufPool->nbBuffers++] = buf; /* stored for later use */
|
||||||
pthread_mutex_unlock(&bufPool->poolMutex);
|
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&bufPool->poolMutex);
|
ZSTD_pthread_mutex_unlock(&bufPool->poolMutex);
|
||||||
/* Reached bufferPool capacity (should not happen) */
|
/* Reached bufferPool capacity (should not happen) */
|
||||||
DEBUGLOG(5, "buffer pool capacity reached => freeing ");
|
DEBUGLOG(5, "buffer pool capacity reached => freeing ");
|
||||||
ZSTD_free(buf.start, bufPool->cMem);
|
ZSTD_free(buf.start, bufPool->cMem);
|
||||||
@ -206,7 +206,7 @@ static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params)
|
|||||||
/* a single CCtx Pool can be invoked from multiple threads in parallel */
|
/* a single CCtx Pool can be invoked from multiple threads in parallel */
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
pthread_mutex_t poolMutex;
|
ZSTD_pthread_mutex_t poolMutex;
|
||||||
unsigned totalCCtx;
|
unsigned totalCCtx;
|
||||||
unsigned availCCtx;
|
unsigned availCCtx;
|
||||||
ZSTD_customMem cMem;
|
ZSTD_customMem cMem;
|
||||||
@ -219,7 +219,7 @@ static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool* pool)
|
|||||||
unsigned u;
|
unsigned u;
|
||||||
for (u=0; u<pool->totalCCtx; u++)
|
for (u=0; u<pool->totalCCtx; u++)
|
||||||
ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */
|
ZSTD_freeCCtx(pool->cctx[u]); /* note : compatible with free on NULL */
|
||||||
pthread_mutex_destroy(&pool->poolMutex);
|
ZSTD_pthread_mutex_destroy(&pool->poolMutex);
|
||||||
ZSTD_free(pool, pool->cMem);
|
ZSTD_free(pool, pool->cMem);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -231,7 +231,7 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
|
|||||||
ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc(
|
ZSTDMT_CCtxPool* const cctxPool = (ZSTDMT_CCtxPool*) ZSTD_calloc(
|
||||||
sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem);
|
sizeof(ZSTDMT_CCtxPool) + (nbThreads-1)*sizeof(ZSTD_CCtx*), cMem);
|
||||||
if (!cctxPool) return NULL;
|
if (!cctxPool) return NULL;
|
||||||
if (pthread_mutex_init(&cctxPool->poolMutex, NULL)) {
|
if (ZSTD_pthread_mutex_init(&cctxPool->poolMutex, NULL)) {
|
||||||
ZSTD_free(cctxPool, cMem);
|
ZSTD_free(cctxPool, cMem);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -247,7 +247,7 @@ static ZSTDMT_CCtxPool* ZSTDMT_createCCtxPool(unsigned nbThreads,
|
|||||||
/* only works during initialization phase, not during compression */
|
/* only works during initialization phase, not during compression */
|
||||||
static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
|
static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
|
||||||
{
|
{
|
||||||
pthread_mutex_lock(&cctxPool->poolMutex);
|
ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
|
||||||
{ unsigned const nbThreads = cctxPool->totalCCtx;
|
{ unsigned const nbThreads = cctxPool->totalCCtx;
|
||||||
size_t const poolSize = sizeof(*cctxPool)
|
size_t const poolSize = sizeof(*cctxPool)
|
||||||
+ (nbThreads-1)*sizeof(ZSTD_CCtx*);
|
+ (nbThreads-1)*sizeof(ZSTD_CCtx*);
|
||||||
@ -256,7 +256,7 @@ static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
|
|||||||
for (u=0; u<nbThreads; u++) {
|
for (u=0; u<nbThreads; u++) {
|
||||||
totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]);
|
totalCCtxSize += ZSTD_sizeof_CCtx(cctxPool->cctx[u]);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&cctxPool->poolMutex);
|
ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex);
|
||||||
return poolSize + totalCCtxSize;
|
return poolSize + totalCCtxSize;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -264,14 +264,14 @@ static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool* cctxPool)
|
|||||||
static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
|
static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
|
||||||
{
|
{
|
||||||
DEBUGLOG(5, "ZSTDMT_getCCtx");
|
DEBUGLOG(5, "ZSTDMT_getCCtx");
|
||||||
pthread_mutex_lock(&cctxPool->poolMutex);
|
ZSTD_pthread_mutex_lock(&cctxPool->poolMutex);
|
||||||
if (cctxPool->availCCtx) {
|
if (cctxPool->availCCtx) {
|
||||||
cctxPool->availCCtx--;
|
cctxPool->availCCtx--;
|
||||||
{ ZSTD_CCtx* const cctx = cctxPool->cctx[cctxPool->availCCtx];
|
{ ZSTD_CCtx* const cctx = cctxPool->cctx[cctxPool->availCCtx];
|
||||||
pthread_mutex_unlock(&cctxPool->poolMutex);
|
ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex);
|
||||||
return cctx;
|
return cctx;
|
||||||
} }
|
} }
|
||||||
pthread_mutex_unlock(&cctxPool->poolMutex);
|
ZSTD_pthread_mutex_unlock(&cctxPool->poolMutex);
|
||||||
DEBUGLOG(5, "create one more CCtx");
|
DEBUGLOG(5, "create one more CCtx");
|
||||||
return ZSTD_createCCtx_advanced(cctxPool->cMem); /* note : can be NULL, when creation fails ! */
|
return ZSTD_createCCtx_advanced(cctxPool->cMem); /* note : can be NULL, when creation fails ! */
|
||||||
}
|
}
|
||||||
@ -279,7 +279,7 @@ static ZSTD_CCtx* ZSTDMT_getCCtx(ZSTDMT_CCtxPool* cctxPool)
|
|||||||
static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
|
static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
|
||||||
{
|
{
|
||||||
if (cctx==NULL) return; /* compatibility with release on NULL */
|
if (cctx==NULL) return; /* compatibility with release on NULL */
|
||||||
pthread_mutex_lock(&pool->poolMutex);
|
ZSTD_pthread_mutex_lock(&pool->poolMutex);
|
||||||
if (pool->availCCtx < pool->totalCCtx)
|
if (pool->availCCtx < pool->totalCCtx)
|
||||||
pool->cctx[pool->availCCtx++] = cctx;
|
pool->cctx[pool->availCCtx++] = cctx;
|
||||||
else {
|
else {
|
||||||
@ -287,7 +287,7 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
|
|||||||
DEBUGLOG(5, "CCtx pool overflow : free cctx");
|
DEBUGLOG(5, "CCtx pool overflow : free cctx");
|
||||||
ZSTD_freeCCtx(cctx);
|
ZSTD_freeCCtx(cctx);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&pool->poolMutex);
|
ZSTD_pthread_mutex_unlock(&pool->poolMutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -305,8 +305,8 @@ typedef struct {
|
|||||||
unsigned lastChunk;
|
unsigned lastChunk;
|
||||||
unsigned jobCompleted;
|
unsigned jobCompleted;
|
||||||
unsigned jobScanned;
|
unsigned jobScanned;
|
||||||
pthread_mutex_t* jobCompleted_mutex;
|
ZSTD_pthread_mutex_t* jobCompleted_mutex;
|
||||||
pthread_cond_t* jobCompleted_cond;
|
ZSTD_pthread_cond_t* jobCompleted_cond;
|
||||||
ZSTD_CCtx_params params;
|
ZSTD_CCtx_params params;
|
||||||
const ZSTD_CDict* cdict;
|
const ZSTD_CDict* cdict;
|
||||||
ZSTDMT_CCtxPool* cctxPool;
|
ZSTDMT_CCtxPool* cctxPool;
|
||||||
@ -373,11 +373,11 @@ _endJob:
|
|||||||
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
|
ZSTDMT_releaseCCtx(job->cctxPool, cctx);
|
||||||
ZSTDMT_releaseBuffer(job->bufPool, job->src);
|
ZSTDMT_releaseBuffer(job->bufPool, job->src);
|
||||||
job->src = g_nullBuffer; job->srcStart = NULL;
|
job->src = g_nullBuffer; job->srcStart = NULL;
|
||||||
PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
|
ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
|
||||||
job->jobCompleted = 1;
|
job->jobCompleted = 1;
|
||||||
job->jobScanned = 0;
|
job->jobScanned = 0;
|
||||||
pthread_cond_signal(job->jobCompleted_cond);
|
ZSTD_pthread_cond_signal(job->jobCompleted_cond);
|
||||||
pthread_mutex_unlock(job->jobCompleted_mutex);
|
ZSTD_pthread_mutex_unlock(job->jobCompleted_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -395,8 +395,8 @@ struct ZSTDMT_CCtx_s {
|
|||||||
ZSTDMT_jobDescription* jobs;
|
ZSTDMT_jobDescription* jobs;
|
||||||
ZSTDMT_bufferPool* bufPool;
|
ZSTDMT_bufferPool* bufPool;
|
||||||
ZSTDMT_CCtxPool* cctxPool;
|
ZSTDMT_CCtxPool* cctxPool;
|
||||||
pthread_mutex_t jobCompleted_mutex;
|
ZSTD_pthread_mutex_t jobCompleted_mutex;
|
||||||
pthread_cond_t jobCompleted_cond;
|
ZSTD_pthread_cond_t jobCompleted_cond;
|
||||||
size_t targetSectionSize;
|
size_t targetSectionSize;
|
||||||
size_t inBuffSize;
|
size_t inBuffSize;
|
||||||
size_t dictSize;
|
size_t dictSize;
|
||||||
@ -459,11 +459,11 @@ ZSTDMT_CCtx* ZSTDMT_createCCtx_advanced(unsigned nbThreads, ZSTD_customMem cMem)
|
|||||||
ZSTDMT_freeCCtx(mtctx);
|
ZSTDMT_freeCCtx(mtctx);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL)) {
|
if (ZSTD_pthread_mutex_init(&mtctx->jobCompleted_mutex, NULL)) {
|
||||||
ZSTDMT_freeCCtx(mtctx);
|
ZSTDMT_freeCCtx(mtctx);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if (pthread_cond_init(&mtctx->jobCompleted_cond, NULL)) {
|
if (ZSTD_pthread_cond_init(&mtctx->jobCompleted_cond, NULL)) {
|
||||||
ZSTDMT_freeCCtx(mtctx);
|
ZSTDMT_freeCCtx(mtctx);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@ -503,8 +503,8 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
|
|||||||
ZSTD_free(mtctx->jobs, mtctx->cMem);
|
ZSTD_free(mtctx->jobs, mtctx->cMem);
|
||||||
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
|
ZSTDMT_freeCCtxPool(mtctx->cctxPool);
|
||||||
ZSTD_freeCDict(mtctx->cdictLocal);
|
ZSTD_freeCDict(mtctx->cdictLocal);
|
||||||
pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
|
ZSTD_pthread_mutex_destroy(&mtctx->jobCompleted_mutex);
|
||||||
pthread_cond_destroy(&mtctx->jobCompleted_cond);
|
ZSTD_pthread_cond_destroy(&mtctx->jobCompleted_cond);
|
||||||
ZSTD_free(mtctx, mtctx->cMem);
|
ZSTD_free(mtctx, mtctx->cMem);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -649,12 +649,12 @@ static size_t ZSTDMT_compress_advanced_internal(
|
|||||||
unsigned chunkID;
|
unsigned chunkID;
|
||||||
for (chunkID=0; chunkID<nbChunks; chunkID++) {
|
for (chunkID=0; chunkID<nbChunks; chunkID++) {
|
||||||
DEBUGLOG(5, "waiting for chunk %u ", chunkID);
|
DEBUGLOG(5, "waiting for chunk %u ", chunkID);
|
||||||
PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
|
ZSTD_PTHREAD_MUTEX_LOCK(&mtctx->jobCompleted_mutex);
|
||||||
while (mtctx->jobs[chunkID].jobCompleted==0) {
|
while (mtctx->jobs[chunkID].jobCompleted==0) {
|
||||||
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
|
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", chunkID);
|
||||||
pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
|
ZSTD_pthread_cond_wait(&mtctx->jobCompleted_cond, &mtctx->jobCompleted_mutex);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
|
ZSTD_pthread_mutex_unlock(&mtctx->jobCompleted_mutex);
|
||||||
DEBUGLOG(5, "ready to write chunk %u ", chunkID);
|
DEBUGLOG(5, "ready to write chunk %u ", chunkID);
|
||||||
|
|
||||||
mtctx->jobs[chunkID].srcStart = NULL;
|
mtctx->jobs[chunkID].srcStart = NULL;
|
||||||
@ -729,12 +729,12 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs)
|
|||||||
DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
|
DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
|
||||||
while (zcs->doneJobID < zcs->nextJobID) {
|
while (zcs->doneJobID < zcs->nextJobID) {
|
||||||
unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
|
unsigned const jobID = zcs->doneJobID & zcs->jobIDMask;
|
||||||
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
|
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
|
||||||
while (zcs->jobs[jobID].jobCompleted==0) {
|
while (zcs->jobs[jobID].jobCompleted==0) {
|
||||||
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
|
DEBUGLOG(5, "waiting for jobCompleted signal from chunk %u", zcs->doneJobID); /* we want to block when waiting for data to flush */
|
||||||
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
|
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
|
ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
|
||||||
zcs->doneJobID++;
|
zcs->doneJobID++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -923,13 +923,13 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
|
|||||||
{
|
{
|
||||||
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
|
unsigned const wJobID = zcs->doneJobID & zcs->jobIDMask;
|
||||||
if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
|
if (zcs->doneJobID == zcs->nextJobID) return 0; /* all flushed ! */
|
||||||
PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
|
ZSTD_PTHREAD_MUTEX_LOCK(&zcs->jobCompleted_mutex);
|
||||||
while (zcs->jobs[wJobID].jobCompleted==0) {
|
while (zcs->jobs[wJobID].jobCompleted==0) {
|
||||||
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);
|
DEBUGLOG(5, "waiting for jobCompleted signal from job %u", zcs->doneJobID);
|
||||||
if (!blockToFlush) { pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
|
if (!blockToFlush) { ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex); return 0; } /* nothing ready to be flushed => skip */
|
||||||
pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */
|
ZSTD_pthread_cond_wait(&zcs->jobCompleted_cond, &zcs->jobCompleted_mutex); /* block when nothing available to flush */
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&zcs->jobCompleted_mutex);
|
ZSTD_pthread_mutex_unlock(&zcs->jobCompleted_mutex);
|
||||||
/* compression job completed : output can be flushed */
|
/* compression job completed : output can be flushed */
|
||||||
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
|
{ ZSTDMT_jobDescription job = zcs->jobs[wJobID];
|
||||||
if (!job.jobScanned) {
|
if (!job.jobScanned) {
|
||||||
|
|||||||
@ -711,8 +711,8 @@ ZDICTLIB_API size_t ZDICT_trainFromBuffer_cover(
|
|||||||
* compiled with multithreaded support.
|
* compiled with multithreaded support.
|
||||||
*/
|
*/
|
||||||
typedef struct COVER_best_s {
|
typedef struct COVER_best_s {
|
||||||
pthread_mutex_t mutex;
|
ZSTD_pthread_mutex_t mutex;
|
||||||
pthread_cond_t cond;
|
ZSTD_pthread_cond_t cond;
|
||||||
size_t liveJobs;
|
size_t liveJobs;
|
||||||
void *dict;
|
void *dict;
|
||||||
size_t dictSize;
|
size_t dictSize;
|
||||||
@ -725,8 +725,8 @@ typedef struct COVER_best_s {
|
|||||||
*/
|
*/
|
||||||
static void COVER_best_init(COVER_best_t *best) {
|
static void COVER_best_init(COVER_best_t *best) {
|
||||||
if (best==NULL) return; /* compatible with init on NULL */
|
if (best==NULL) return; /* compatible with init on NULL */
|
||||||
(void)pthread_mutex_init(&best->mutex, NULL);
|
(void)ZSTD_pthread_mutex_init(&best->mutex, NULL);
|
||||||
(void)pthread_cond_init(&best->cond, NULL);
|
(void)ZSTD_pthread_cond_init(&best->cond, NULL);
|
||||||
best->liveJobs = 0;
|
best->liveJobs = 0;
|
||||||
best->dict = NULL;
|
best->dict = NULL;
|
||||||
best->dictSize = 0;
|
best->dictSize = 0;
|
||||||
@ -741,11 +741,11 @@ static void COVER_best_wait(COVER_best_t *best) {
|
|||||||
if (!best) {
|
if (!best) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
pthread_mutex_lock(&best->mutex);
|
ZSTD_pthread_mutex_lock(&best->mutex);
|
||||||
while (best->liveJobs != 0) {
|
while (best->liveJobs != 0) {
|
||||||
pthread_cond_wait(&best->cond, &best->mutex);
|
ZSTD_pthread_cond_wait(&best->cond, &best->mutex);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&best->mutex);
|
ZSTD_pthread_mutex_unlock(&best->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -759,8 +759,8 @@ static void COVER_best_destroy(COVER_best_t *best) {
|
|||||||
if (best->dict) {
|
if (best->dict) {
|
||||||
free(best->dict);
|
free(best->dict);
|
||||||
}
|
}
|
||||||
pthread_mutex_destroy(&best->mutex);
|
ZSTD_pthread_mutex_destroy(&best->mutex);
|
||||||
pthread_cond_destroy(&best->cond);
|
ZSTD_pthread_cond_destroy(&best->cond);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -771,9 +771,9 @@ static void COVER_best_start(COVER_best_t *best) {
|
|||||||
if (!best) {
|
if (!best) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
pthread_mutex_lock(&best->mutex);
|
ZSTD_pthread_mutex_lock(&best->mutex);
|
||||||
++best->liveJobs;
|
++best->liveJobs;
|
||||||
pthread_mutex_unlock(&best->mutex);
|
ZSTD_pthread_mutex_unlock(&best->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -789,7 +789,7 @@ static void COVER_best_finish(COVER_best_t *best, size_t compressedSize,
|
|||||||
}
|
}
|
||||||
{
|
{
|
||||||
size_t liveJobs;
|
size_t liveJobs;
|
||||||
pthread_mutex_lock(&best->mutex);
|
ZSTD_pthread_mutex_lock(&best->mutex);
|
||||||
--best->liveJobs;
|
--best->liveJobs;
|
||||||
liveJobs = best->liveJobs;
|
liveJobs = best->liveJobs;
|
||||||
/* If the new dictionary is better */
|
/* If the new dictionary is better */
|
||||||
@ -812,9 +812,9 @@ static void COVER_best_finish(COVER_best_t *best, size_t compressedSize,
|
|||||||
best->parameters = parameters;
|
best->parameters = parameters;
|
||||||
best->compressedSize = compressedSize;
|
best->compressedSize = compressedSize;
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&best->mutex);
|
ZSTD_pthread_mutex_unlock(&best->mutex);
|
||||||
if (liveJobs == 0) {
|
if (liveJobs == 0) {
|
||||||
pthread_cond_broadcast(&best->cond);
|
ZSTD_pthread_cond_broadcast(&best->cond);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user