/* ==========================================================================
* libevnet/src/thread.c - Network server library for libevent.
* --------------------------------------------------------------------------
* Copyright (c) 2006 William Ahern
* Copyright (c) 2005 Barracuda Networks, Inc.
* Copyright (c) 2002 William Ahern (Originally under GPL from AnonNet.)
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to permit
* persons to whom the Software is furnished to do so, subject to the
* following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
* USE OR OTHER DEALINGS IN THE SOFTWARE.
* ==========================================================================
*/
#ifdef USE_PTHREADS
#define _REENTRANT /* pthreads(3) */
#define _GNU_SOURCE /* pthread_yield(3) */
#include <stdlib.h> /* malloc(3) */
#include <inttypes.h> /* intptr_t */
#include <time.h> /* time_t */
#include <signal.h> /* SIG_BLOCK SIG_SETMASK sigset_t sigemptyset(3) */
#include <string.h> /* strerror(3) */
#include <errno.h>
#include <fcntl.h> /* F_GETFL F_SETFL O_NONBLOCK fcntl(2) */
#include <unistd.h> /* pipe(2) */
#include <sys/queue.h> /* LIST TAILQ */
#include <sys/time.h> /* event(3) */
#include <pthread.h> /* PTHREAD_MUTEX_INITIALIZER pthread_mutex_lock(3)
* pthread_mutex_unlock(3) pthread_yield(3)
* pthread_cond_wait(3) pthread_cond_wait(3)
* pthread_create(3) pthread_once(3)
* pthread_key_create(3) pthread_getspecific(3)
* pthread_setspecific(3) pthread_sigmask(3)
*/
#include <assert.h>
#include <sys/types.h> /* u_char */
#include <event.h> /* struct event event_set(3) event_add(3) */
#include "thread.h"
struct thread_job; /* Threaded job. */
struct thread_pool; /* Collection of threads managed from an event loop. */
struct thread; /* Thread which runs jobs. */
struct thread_job {
thread_enter run; /* Function to run in thread. */
void *arg; /* Argument to runnable function. */
enum thread_priority pri; /* Priority to run thread with. */
thread_return cb; /* Callback function. */
void *ctx; /* Callback function context. */
unsigned timeout; /* Maximum timeout for operations. */
void *ret; /* Job return value. */
time_t inserted;
struct thread_pool *pool;
TAILQ_ENTRY(thread_job) tqe;
}; /* struct thread_job */
struct thread {
pthread_t tid;
struct thread_pool *pool;
int attached;
struct thread_job *job;
TAILQ_ENTRY(thread) tqe;
LIST_ENTRY(thread) le;
}; /* struct thread */
static const struct thread_pool {
struct thread_options opts;
struct event_base *ev_base;
int closed;
struct {
TAILQ_HEAD(, thread_job) free;
TAILQ_HEAD(, thread_job) queue;
TAILQ_HEAD(, thread_job) drop;
} jobs;
struct {
pthread_mutex_t mutex;
pthread_cond_t gate;
struct event ev;
int ev_pending;
int pipe[2];
TAILQ_HEAD(, thread) list;
unsigned int nthreads;
LIST_HEAD(, thread) queue;
} threads;
} thread_pool_initializer = {
.threads = { .mutex = PTHREAD_MUTEX_INITIALIZER,
.gate = PTHREAD_COND_INITIALIZER,
.pipe = { -1, -1 }, },
};
struct thread_options thread_defaults = {
.nthreads_min = 1,
.nthreads_max = 3,
}; /* thread_defaults */
static pthread_key_t pool_key;
#define THREAD_POOL thread_pool_get()
static struct thread_pool *thread_pool_get(void) {
struct thread_pool *tp;
assert((tp = pthread_getspecific(pool_key)));
return tp;
} /* thread_pool_get() */
static void *thread_loop(void *t_) {
struct thread *t = t_;
int locked = 0;
int num;
while (t->attached) {
if (0 != pthread_mutex_lock(&t->pool->threads.mutex))
goto sysfail;
locked = 1;
/*
* Wait in line for another job.
*/
while (t->attached && !(t->job = TAILQ_FIRST(&t->pool->jobs.queue))) {
LIST_INSERT_HEAD(&t->pool->threads.queue, t, le);
if (0 != pthread_cond_wait(&t->pool->threads.gate, &t->pool->threads.mutex))
goto sysfail;
LIST_REMOVE(t, le);
}
if (t->job)
TAILQ_REMOVE(&t->pool->jobs.queue, t->job, tqe);
assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex));
locked = 0;
if (t->job) {
t->job->ret = t->job->run(t->job->arg, t->job->ctx, t->job->timeout);
assert(0 == pthread_mutex_lock(&t->pool->threads.mutex));
locked = 1;
TAILQ_INSERT_TAIL(&t->pool->jobs.drop, t->job, tqe);
assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex));
locked = 0;
do {
num = write(t->pool->threads.pipe[1], "", 1);
} while(num == -1 && errno == EINTR);
}
}
assert(0 == pthread_mutex_lock(&t->pool->threads.mutex));
TAILQ_REMOVE(&t->pool->threads.list, t, tqe);
t->pool->threads.nthreads--;
assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex));
free(t);
return (void *)(intptr_t)0;
sysfail:
if (!locked)
assert(0 == pthread_mutex_lock(&t->pool->threads.mutex));
TAILQ_REMOVE(&t->pool->threads.list, t, tqe);
t->pool->threads.nthreads--;
assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex));
free(t);
return (void *)(intptr_t)errno;
} /* thread_loop() */
static int thread_start(struct thread_pool *tp) {
struct thread *t = NULL;
sigset_t saved_set;
int sys_errno;
t = malloc(sizeof *t);
if (!t)
goto sysfail;
t->pool = tp;
t->attached = 1;
t->job = NULL;
if (0 != pthread_sigmask(tp->opts.sigmask_how, &tp->opts.sigmask_set, &saved_set))
goto sysfail;
if (0 != pthread_create(&t->tid, 0, &thread_loop, t)) {
assert(0 == pthread_sigmask(SIG_SETMASK, &saved_set, 0));
goto sysfail;
}
assert(0 == pthread_sigmask(SIG_SETMASK, &saved_set, 0));
/*
* This critical section protected from thread_job_dispatch() or
* thread_start_all().
*/
assert(0 != pthread_mutex_trylock(&tp->threads.mutex));
TAILQ_INSERT_TAIL(&t->pool->threads.list, t, tqe);
t->pool->threads.nthreads++;
return 0;
sysfail:
sys_errno = errno;
free(t);
errno = sys_errno;
return -1;
} /* thread_start() */
static int thread_start_all(struct thread_pool *tp) {
int okay;
if (0 != pthread_mutex_lock(&tp->threads.mutex))
return -1;
while (tp->threads.nthreads < tp->opts.nthreads_max) {
okay = (0 == thread_start(tp));
/* tp->threads.nthreads incremented by thread_start() */
assert(0 == pthread_mutex_unlock(&tp->threads.mutex));
if (!okay)
return 0;
pthread_yield();
if (0 != pthread_mutex_lock(&tp->threads.mutex))
return -1;
}
assert(0 == pthread_mutex_unlock(&tp->threads.mutex));
return 0;
} /* thread_start_all() */
static struct thread_job *thread_job_open(struct thread_pool *tp, thread_enter func, void *arg, enum thread_priority pri, thread_return cb, void *ctx, unsigned timeout) {
struct thread_job *j;
j = TAILQ_FIRST(&tp->jobs.free);
if (!j) {
j = malloc(sizeof *j);
if (!j)
return NULL;
} else
TAILQ_REMOVE(&tp->jobs.free, j, tqe);
j->pool = tp;
j->run = func;
j->arg = arg;
j->pri = pri;
j->cb = cb;
j->ctx = ctx;
j->timeout = timeout;
j->ret = NULL;
return j;
} /* thread_job_open() */
static void thread_job_close(struct thread_pool *tp, struct thread_job *j, enum thread_errno err) {
thread_return cb = j->cb;
void *cb_ret = j->ret;
void *cb_ctx = j->ctx;
TAILQ_INSERT_HEAD(&tp->jobs.free, j, tqe);
cb(cb_ret, err, cb_ctx);
return /* void */;
} /* thread_job_close() */
static void thread_job_catch(int fd, short event, void *ev_arg) {
struct thread_pool *tp = THREAD_POOL;
TAILQ_HEAD(, thread_job) jobs = TAILQ_HEAD_INITIALIZER(jobs);
char buf[32];
int num;
struct thread_job *j, *x;
/* Make sure all our assumptions are correct. */
assert(ev_arg == tp && "libevent loop must run from same thread as thread_init().");
do {
num = read(fd, buf, sizeof buf);
} while (num > 0 || (num == -1 && errno == EINTR));
if (num == -1 && errno != EAGAIN)
goto sysfail;
if (0 != pthread_mutex_lock(&tp->threads.mutex))
goto sysfail;
for (j = TAILQ_FIRST(&tp->jobs.drop); j; j = x) {
x = TAILQ_NEXT(j, tqe);
TAILQ_REMOVE(&tp->jobs.drop, j, tqe);
TAILQ_INSERT_TAIL(&jobs, j, tqe);
}
assert(0 == pthread_mutex_unlock(&tp->threads.mutex));
/*
* We must issue the callbacks outside of our critical section to
* avoid any deadlocks if the caller recurses (i.e. from the
* callback calls thread_run() again, which will attempt to take the
* thread-pool lock).
*/
for (j = TAILQ_FIRST(&jobs); j; j = x) {
x = TAILQ_NEXT(j, tqe);
TAILQ_REMOVE(&jobs, j, tqe);
thread_job_close(tp, j, THREAD_ESUCCESS);
}
return /* void */;
sysfail:
assert(0 && "Cannot recover from thread_job_catch() failure.");
return /* void */;
} /* thread_job_catch() */
static void thread_job_dispatch(struct thread_pool *tp, struct thread_job *j) {
int locked = 0;
if (0 != pthread_mutex_lock(&tp->threads.mutex))
goto sysfail;
locked = 1;
/*
* TODO: Use priority.
*/
TAILQ_INSERT_TAIL(&tp->jobs.queue, j, tqe);
j->inserted = time(NULL);
if (LIST_FIRST(&tp->threads.queue)) {
if (0 != pthread_cond_signal(&tp->threads.gate)) {
TAILQ_REMOVE(&tp->jobs.queue, j, tqe);
goto sysfail;
}
} else if (tp->threads.nthreads < tp->opts.nthreads_max && !tp->opts.nthreads_fixed) {
if (0 != thread_start(tp)) {
TAILQ_REMOVE(&tp->jobs.queue, j, tqe);
goto sysfail;
}
}
assert(0 == pthread_mutex_unlock(&tp->threads.mutex));
pthread_yield();
return /* void */;
sysfail:
if (locked)
assert(0 == pthread_mutex_unlock(&tp->threads.mutex));
thread_job_close(tp, j, THREAD_ESYSTEM);
return /* void */;
} /* thread_job_dispatch() */
void thread_run(thread_enter func, void *arg, enum thread_priority pri, thread_return cb, void *ctx, unsigned timeout) {
struct thread_pool *tp = THREAD_POOL;
struct thread_job *j = thread_job_open(tp, func, arg, pri, cb, ctx, timeout);
if (!j)
goto sysfail;
if (tp->closed) {
errno = EAGAIN;
goto sysfail;
}
thread_job_dispatch(tp, j);
return /* void */;
sysfail:
if (j) {
cb(NULL, THREAD_ESYSTEM, ctx);
} else
thread_job_close(tp, j, THREAD_ESYSTEM);
return /* void */;
} /* thread_run() */
const char *thread_strerror(enum thread_errno err) {
switch (err) {
case THREAD_ESUCCESS:
return "Success";
case THREAD_ESYSTEM:
return strerror(errno);
case THREAD_ETIMEDOUT:
return "Operation timed out";
default:
return "Unknown";
}
} /* thread_strerror() */
static void thread_pool_close(struct thread_pool *tp) {
struct thread *t;
unsigned int nthreads;
struct thread_job *j;
if (!tp)
return /* void */;
tp->closed = 1;
assert(0 == pthread_mutex_lock(&tp->threads.mutex));
nthreads = tp->threads.nthreads;
TAILQ_FOREACH(t, &tp->threads.list, tqe) {
t->attached = 0;
}
assert(0 == pthread_mutex_unlock(&tp->threads.mutex));
/* Spin trying to catch all our threads. */
while (nthreads > 0) {
if (tp->ev_base)
event_base_loop(tp->ev_base, EVLOOP_NONBLOCK);
else
event_loop(EVLOOP_NONBLOCK);
assert(0 == pthread_cond_broadcast(&tp->threads.gate));
pthread_yield();
assert(0 == pthread_mutex_lock(&tp->threads.mutex));
nthreads = tp->threads.nthreads;
assert(0 == pthread_mutex_unlock(&tp->threads.mutex));
}
assert(0 == pthread_mutex_destroy(&tp->threads.mutex));
assert(0 == pthread_cond_destroy(&tp->threads.gate));
assert(TAILQ_EMPTY(&tp->threads.list));
while (!TAILQ_EMPTY(&tp->jobs.drop)) {
thread_job_catch(tp->threads.pipe[0], 0, tp);
}
while ((j = TAILQ_FIRST(&tp->jobs.queue))) {
TAILQ_REMOVE(&tp->jobs.queue, j, tqe);
errno = EAGAIN;
thread_job_close(tp, j, THREAD_ESYSTEM);
}
while ((j = TAILQ_FIRST(&tp->jobs.free))) {
TAILQ_REMOVE(&tp->jobs.free, j, tqe);
free(j);
}
if (tp->threads.ev_pending)
(void)event_del(&tp->threads.ev);
(void)close(tp->threads.pipe[0]);
(void)close(tp->threads.pipe[1]);
memset(tp, '\0', sizeof *tp);
free(tp);
return /* void */;
} /* thread_pool_close() */
static void thread_pool_destructor(void *tp) {
thread_pool_close(tp);
} /* thread_pool_destructor() */
static struct thread_pool *thread_pool_open(const struct thread_options *opts, struct event_base *ev_base) {
struct thread_pool *tp = 0;
int flags;
if (!opts)
opts = &thread_defaults;
if (!(tp = malloc(sizeof *tp)))
goto sysfail;
*tp = thread_pool_initializer;
tp->opts = *opts;
tp->ev_base = ev_base;
TAILQ_INIT(&tp->jobs.free);
TAILQ_INIT(&tp->jobs.queue);
TAILQ_INIT(&tp->jobs.drop);
TAILQ_INIT(&tp->threads.list);
LIST_INIT(&tp->threads.queue);
if (0 != pipe(tp->threads.pipe)
|| -1 == (flags = fcntl(tp->threads.pipe[0], F_GETFL))
|| 0 != fcntl(tp->threads.pipe[0], F_SETFL, flags | O_NONBLOCK)
|| -1 == (flags = fcntl(tp->threads.pipe[1], F_GETFL))
|| 0 != fcntl(tp->threads.pipe[1], F_SETFL, flags | O_NONBLOCK)) {
goto sysfail;
}
event_set(&tp->threads.ev, tp->threads.pipe[0], EV_READ | EV_PERSIST, &thread_job_catch, tp);
if (ev_base)
event_base_set(ev_base, &tp->threads.ev);
if (0 != event_add(&tp->threads.ev, 0))
goto sysfail;
tp->threads.ev_pending = 1;
return tp;
sysfail:
thread_pool_close(tp);
return 0;
} /* thread_pool_open() */
enum thread_errno thread_reset(const struct thread_options *opts, struct event_base *ev_base) {
thread_pool_close(THREAD_POOL);
return thread_init(opts, ev_base);
} /* thread_reset() */
static int init_once_okay;
static int init_once_errno;
static void thread_init_once(void) {
if (0 != pthread_key_create(&pool_key, &thread_pool_destructor)) {
init_once_errno = errno;
return /* void */;
}
/*
* SIG_BLOCK with an empty set should affect a null operation.
*/
thread_defaults.sigmask_how = SIG_BLOCK;
(void)sigemptyset(&thread_defaults.sigmask_set);
init_once_okay = 1;
return /* void */;
} /* thread_init_once() */
enum thread_errno thread_init(const struct thread_options *opts, struct event_base *ev_base) {
static pthread_once_t init_once = PTHREAD_ONCE_INIT;
struct thread_pool *tp = 0;
int sys_errno;
if (0 != pthread_once(&init_once, &thread_init_once))
return THREAD_ESYSTEM;
if (!init_once_okay) {
errno = init_once_errno;
goto sysfail;
}
if (!(tp = thread_pool_open(opts, ev_base)))
goto sysfail;
if (0 != pthread_setspecific(pool_key, tp))
goto sysfail;
if (tp->opts.nthreads_fixed && 0 != thread_start_all(tp))
goto sysfail;
return 0;
sysfail:
sys_errno = errno;
if (tp != 0)
thread_pool_close(tp);
errno = sys_errno;
return THREAD_ESYSTEM;
} /* thread_init() */
#else /* elif !defined USE_PTHREADS */
struct event_base;
#include <stddef.h> /* size_t */
#include <sys/time.h> /* struct timeval */
#include "thread.h"
void thread_run(thread_enter a, void *b, enum thread_priority c, thread_return cb, void *arg, unsigned d) {
cb(0, THREAD_ESYSTEM, arg);
return /* void */;
} /* thread_run() */
const char *thread_strerror(enum thread_errno e) {
return "Unknown";
} /* thread_strerror() */
enum thread_errno thread_reset(const struct thread_options *opts, struct event_base *ev_base) {
return THREAD_ESYSTEM;
} /* thread_reset() */
enum thread_errno thread_init(const struct thread_options *opts, struct event_base *ev_base) {
return THREAD_ESYSTEM;
} /* thread_init() */
#endif /* USE_PTHREADS */
syntax highlighted by Code2HTML, v. 0.9.1