diff --git a/.travis.yml b/.travis.yml index 6bf99f1bf..36537cbee 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ matrix: os: linux sudo: false - - env: Ubu=12.04cont Cmd="make zlibwrapper && make clean && make -C tests test-symbols && make clean && make -C tests test-zstd-nolegacy && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest pzstd tests check && make -C contrib/pzstd clean" + - env: Ubu=12.04cont Cmd="make zlibwrapper && make clean && make -C tests test-pool && make -C tests test-symbols && make clean && make -C tests test-zstd-nolegacy && make clean && make cmaketest && make clean && make -C contrib/pzstd googletest pzstd tests check && make -C contrib/pzstd clean" os: linux sudo: false language: cpp diff --git a/contrib/pzstd/Makefile b/contrib/pzstd/Makefile index 99d955e94..f148bfd8e 100644 --- a/contrib/pzstd/Makefile +++ b/contrib/pzstd/Makefile @@ -26,7 +26,7 @@ POSTCOMPILE = mv -f $*.Td $*.d # CFLAGS, CXXFLAGS, CPPFLAGS, and LDFLAGS are for the users to override CFLAGS ?= -O3 -Wall -Wextra -CXXFLAGS ?= -O3 -Wall -Wextra -pedantic -std=c++11 +CXXFLAGS ?= -O3 -Wall -Wextra -pedantic CPPFLAGS ?= LDFLAGS ?= @@ -37,7 +37,7 @@ GTEST_INC = -isystem googletest/googletest/include PZSTD_CPPFLAGS = $(PZSTD_INC) PZSTD_CCXXFLAGS = PZSTD_CFLAGS = $(PZSTD_CCXXFLAGS) -PZSTD_CXXFLAGS = $(PZSTD_CCXXFLAGS) +PZSTD_CXXFLAGS = $(PZSTD_CCXXFLAGS) -std=c++11 PZSTD_LDFLAGS = EXTRA_FLAGS = ALL_CFLAGS = $(EXTRA_FLAGS) $(CPPFLAGS) $(PZSTD_CPPFLAGS) $(CFLAGS) $(PZSTD_CFLAGS) diff --git a/lib/Makefile b/lib/Makefile index cd87e7756..efd3b87fe 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -98,12 +98,7 @@ else INSTALL ?= install endif -ifneq (,$(filter $(shell uname),OpenBSD FreeBSD NetBSD DragonFly SunOS)) -PREFIX ?= /usr -else -PREFIX ?= /usr/local -endif - +PREFIX ?= /usr/local DESTDIR ?= LIBDIR ?= $(PREFIX)/lib INCLUDEDIR ?= $(PREFIX)/include diff --git a/lib/common/pool.c b/lib/common/pool.c new file mode 100644 index 000000000..bea48f318 --- /dev/null +++ b/lib/common/pool.c @@ -0,0 +1,190 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +#include "pool.h" +#include /* size_t */ +#include /* malloc, calloc, free */ + +#ifdef ZSTD_PTHREAD + +#include + +/* A job is a function and an opaque argument */ +typedef struct POOL_job_s { + POOL_function function; + void *opaque; +} POOL_job; + +struct POOL_ctx_s { + /* Keep track of the threads */ + pthread_t *threads; + size_t numThreads; + + /* The queue is a circular buffer */ + POOL_job *queue; + size_t queueHead; + size_t queueTail; + size_t queueSize; + /* The mutex protects the queue */ + pthread_mutex_t queueMutex; + /* Condition variable for pushers to wait on when the queue is full */ + pthread_cond_t queuePushCond; + /* Condition variables for poppers to wait on when the queue is empty */ + pthread_cond_t queuePopCond; + /* Indicates if the queue is shutting down */ + int shutdown; +}; + +/* POOL_thread() : + Work thread for the thread pool. + Waits for jobs and executes them. + @returns : NULL on failure else non-null. +*/ +static void *POOL_thread(void *opaque) { + POOL_ctx *ctx = (POOL_ctx *)opaque; + if (!ctx) { return NULL; } + for (;;) { + /* Lock the mutex and wait for a non-empty queue or until shutdown */ + if (pthread_mutex_lock(&ctx->queueMutex)) { return NULL; } + while (ctx->queueHead == ctx->queueTail && !ctx->shutdown) { + if (pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex)) { return NULL; } + } + /* empty => shutting down: so stop */ + if (ctx->queueHead == ctx->queueTail) { + if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; } + return opaque; + } + { + /* Pop a job off the queue */ + POOL_job job = ctx->queue[ctx->queueHead]; + ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; + /* Unlock the mutex, signal a pusher, and run the job */ + if (pthread_mutex_unlock(&ctx->queueMutex)) { return NULL; } + if (pthread_cond_signal(&ctx->queuePushCond)) { return NULL; } + job.function(job.opaque); + } + } + /* Unreachable */ +} + +POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { + int err = 0; + POOL_ctx *ctx; + /* Check the parameters */ + if (!numThreads || !queueSize) { return NULL; } + /* Allocate the context and zero initialize */ + ctx = (POOL_ctx *)calloc(1, sizeof(POOL_ctx)); + if (!ctx) { return NULL; } + /* Initialize the job queue. + * It needs one extra space since one space is wasted to differentiate empty + * and full queues. + */ + ctx->queueSize = queueSize + 1; + ctx->queue = (POOL_job *)malloc(ctx->queueSize * sizeof(POOL_job)); + ctx->queueHead = 0; + ctx->queueTail = 0; + err |= pthread_mutex_init(&ctx->queueMutex, NULL); + err |= pthread_cond_init(&ctx->queuePushCond, NULL); + err |= pthread_cond_init(&ctx->queuePopCond, NULL); + ctx->shutdown = 0; + /* Allocate space for the thread handles */ + ctx->threads = (pthread_t *)malloc(numThreads * sizeof(pthread_t)); + ctx->numThreads = 0; + /* Check for errors */ + if (!ctx->threads || !ctx->queue || err) { POOL_free(ctx); return NULL; } + /* Initialize the threads */ + { size_t i; + for (i = 0; i < numThreads; ++i) { + if (pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { + ctx->numThreads = i; + POOL_free(ctx); + return NULL; + } + } + ctx->numThreads = numThreads; + } + return ctx; +} + +/*! POOL_join() : + Shutdown the queue, wake any sleeping threads, and join all of the threads. +*/ +static void POOL_join(POOL_ctx *ctx) { + /* Shut down the queue */ + pthread_mutex_lock(&ctx->queueMutex); + ctx->shutdown = 1; + pthread_mutex_unlock(&ctx->queueMutex); + /* Wake up sleeping threads */ + pthread_cond_broadcast(&ctx->queuePushCond); + pthread_cond_broadcast(&ctx->queuePopCond); + /* Join all of the threads */ + { size_t i; + for (i = 0; i < ctx->numThreads; ++i) { + pthread_join(ctx->threads[i], NULL); + } + } +} + +void POOL_free(POOL_ctx *ctx) { + if (!ctx) { return; } + POOL_join(ctx); + pthread_mutex_destroy(&ctx->queueMutex); + pthread_cond_destroy(&ctx->queuePushCond); + pthread_cond_destroy(&ctx->queuePopCond); + if (ctx->queue) free(ctx->queue); + if (ctx->threads) free(ctx->threads); + free(ctx); +} + +void POOL_add(void *ctxVoid, POOL_function function, void *opaque) { + POOL_ctx *ctx = (POOL_ctx *)ctxVoid; + if (!ctx) { return; } + + pthread_mutex_lock(&ctx->queueMutex); + { + POOL_job job = {function, opaque}; + /* Wait until there is space in the queue for the new job */ + size_t newTail = (ctx->queueTail + 1) % ctx->queueSize; + while (ctx->queueHead == newTail && !ctx->shutdown) { + pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); + newTail = (ctx->queueTail + 1) % ctx->queueSize; + } + /* The queue is still going => there is space */ + if (!ctx->shutdown) { + ctx->queue[ctx->queueTail] = job; + ctx->queueTail = newTail; + } + } + pthread_mutex_unlock(&ctx->queueMutex); + pthread_cond_signal(&ctx->queuePopCond); +} + +#else + +/* We don't need any data, but if it is empty malloc() might return NULL. */ +struct POOL_ctx_s { + int data; +}; + +POOL_ctx *POOL_create(size_t numThreads, size_t queueSize) { + (void)numThreads; + (void)queueSize; + return (POOL_ctx *)malloc(sizeof(POOL_ctx)); +} + +void POOL_free(POOL_ctx *ctx) { + if (ctx) free(ctx); +} + +void POOL_add(void *ctx, POOL_function function, void *opaque) { + (void)ctx; + function(opaque); +} + +#endif diff --git a/lib/common/pool.h b/lib/common/pool.h new file mode 100644 index 000000000..f4afc1ee3 --- /dev/null +++ b/lib/common/pool.h @@ -0,0 +1,45 @@ +/** + * Copyright (c) 2016-present, Facebook, Inc. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ +#ifndef POOL_H +#define POOL_H + +#include /* size_t */ + +typedef struct POOL_ctx_s POOL_ctx; + +/*! POOL_create() : + Create a thread pool with at most `numThreads` threads. + `numThreads` must be at least 1. + The maximum number of queued jobs before blocking is `queueSize`. + `queueSize` must be at least 1. + @return : The POOL_ctx pointer on success else NULL. +*/ +POOL_ctx *POOL_create(size_t numThreads, size_t queueSize); + +/*! POOL_free() : + Free a thread pool returned by POOL_create(). +*/ +void POOL_free(POOL_ctx *ctx); + +/*! POOL_function : + The function type that can be added to a thread pool. +*/ +typedef void (*POOL_function)(void *); +/*! POOL_add_function : + The function type for a generic thread pool add function. +*/ +typedef void (*POOL_add_function)(void *, POOL_function, void *); + +/*! POOL_add() : + Add the job `function(opaque)` to the thread pool. + Possibly blocks until there is room in the queue. +*/ +void POOL_add(void *ctx, POOL_function function, void *opaque); + +#endif diff --git a/programs/Makefile b/programs/Makefile index 156bf8980..6bd0014a6 100644 --- a/programs/Makefile +++ b/programs/Makefile @@ -156,15 +156,15 @@ else INSTALL ?= install endif -ifneq (,$(filter $(shell uname),OpenBSD FreeBSD NetBSD DragonFly SunOS)) -PREFIX ?= /usr -else PREFIX ?= /usr/local -endif - DESTDIR ?= BINDIR ?= $(PREFIX)/bin + +ifneq (,$(filter $(shell uname),OpenBSD FreeBSD NetBSD DragonFly SunOS)) +MANDIR ?= $(PREFIX)/man/man1 +else MANDIR ?= $(PREFIX)/share/man/man1 +endif INSTALL_PROGRAM ?= $(INSTALL) -m 755 INSTALL_SCRIPT ?= $(INSTALL) -m 755 diff --git a/tests/.gitignore b/tests/.gitignore index e932ad91c..5041404dd 100644 --- a/tests/.gitignore +++ b/tests/.gitignore @@ -15,6 +15,7 @@ paramgrill32 roundTripCrash longmatch symbols +pool invalidDictionaries # Tmp test directory diff --git a/tests/Makefile b/tests/Makefile index c080fe34a..739944de8 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -158,6 +158,9 @@ else $(CC) $(FLAGS) $^ -o $@$(EXT) -Wl,-rpath=$(ZSTDDIR) $(ZSTDDIR)/libzstd.so endif +pool : pool.c $(ZSTDDIR)/common/pool.c + $(CC) $(FLAGS) -pthread -DZSTD_PTHREAD $^ -o $@$(EXT) + namespaceTest: if $(CC) namespaceTest.c ../lib/common/xxhash.c -o $@ ; then echo compilation should fail; exit 1 ; fi $(RM) $@ @@ -176,7 +179,7 @@ clean: fuzzer-dll$(EXT) zstreamtest-dll$(EXT) zbufftest-dll$(EXT)\ zstreamtest$(EXT) zstreamtest32$(EXT) \ datagen$(EXT) paramgrill$(EXT) roundTripCrash$(EXT) longmatch$(EXT) \ - symbols$(EXT) invalidDictionaries$(EXT) + symbols$(EXT) invalidDictionaries$(EXT) pool$(EXT) @echo Cleaning completed @@ -288,4 +291,7 @@ test-invalidDictionaries: invalidDictionaries test-symbols: symbols $(QEMU_SYS) ./symbols +test-pool: pool + $(QEMU_SYS) ./pool + endif diff --git a/tests/pool.c b/tests/pool.c new file mode 100644 index 000000000..ce38075d0 --- /dev/null +++ b/tests/pool.c @@ -0,0 +1,70 @@ +#include "pool.h" +#include +#include +#include + +#define ASSERT_TRUE(p) \ + do { \ + if (!(p)) { \ + return 1; \ + } \ + } while (0) +#define ASSERT_FALSE(p) ASSERT_TRUE(!(p)) +#define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs)) + +struct data { + pthread_mutex_t mutex; + unsigned data[1024]; + size_t i; +}; + +void fn(void *opaque) { + struct data *data = (struct data *)opaque; + pthread_mutex_lock(&data->mutex); + data->data[data->i] = data->i; + ++data->i; + pthread_mutex_unlock(&data->mutex); +} + +int testOrder(size_t numThreads, size_t queueLog) { + struct data data; + POOL_ctx *ctx = POOL_create(numThreads, queueLog); + ASSERT_TRUE(ctx); + data.i = 0; + ASSERT_FALSE(pthread_mutex_init(&data.mutex, NULL)); + { + size_t i; + for (i = 0; i < 1024; ++i) { + POOL_add(ctx, &fn, &data); + } + } + POOL_free(ctx); + ASSERT_EQ(1024, data.i); + { + size_t i; + for (i = 0; i < data.i; ++i) { + ASSERT_EQ(i, data.data[i]); + } + } + ASSERT_FALSE(pthread_mutex_destroy(&data.mutex)); + return 0; +} + +int main(int argc, const char **argv) { + size_t numThreads; + for (numThreads = 1; numThreads <= 8; ++numThreads) { + size_t queueLog; + for (queueLog = 1; queueLog <= 8; ++queueLog) { + if (testOrder(numThreads, queueLog)) { + printf("FAIL: testOrder\n"); + return 1; + } + } + } + printf("PASS: testOrder\n"); + (POOL_create(0, 1) || POOL_create(1, 0)) ? printf("FAIL: testInvalid\n") + : printf("PASS: testInvalid\n"); + (void)argc; + (void)argv; + return 0; +}