/* ========================================================================== * libevnet/src/thread.c - Network server library for libevent. * -------------------------------------------------------------------------- * Copyright (c) 2006 William Ahern * Copyright (c) 2005 Barracuda Networks, Inc. * Copyright (c) 2002 William Ahern (Originally under GPL from AnonNet.) * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to permit * persons to whom the Software is furnished to do so, subject to the * following conditions: * * The above copyright notice and this permission notice shall be included * in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE * USE OR OTHER DEALINGS IN THE SOFTWARE. * ========================================================================== */ #ifdef USE_PTHREADS #define _REENTRANT /* pthreads(3) */ #define _GNU_SOURCE /* pthread_yield(3) */ #include /* malloc(3) */ #include /* intptr_t */ #include /* time_t */ #include /* SIG_BLOCK SIG_SETMASK sigset_t sigemptyset(3) */ #include /* strerror(3) */ #include #include /* F_GETFL F_SETFL O_NONBLOCK fcntl(2) */ #include /* pipe(2) */ #include /* LIST TAILQ */ #include /* event(3) */ #include /* PTHREAD_MUTEX_INITIALIZER pthread_mutex_lock(3) * pthread_mutex_unlock(3) pthread_yield(3) * pthread_cond_wait(3) pthread_cond_wait(3) * pthread_create(3) pthread_once(3) * pthread_key_create(3) pthread_getspecific(3) * pthread_setspecific(3) pthread_sigmask(3) */ #include #include /* u_char */ #include /* struct event event_set(3) event_add(3) */ #include "thread.h" struct thread_job; /* Threaded job. */ struct thread_pool; /* Collection of threads managed from an event loop. */ struct thread; /* Thread which runs jobs. */ struct thread_job { thread_enter run; /* Function to run in thread. */ void *arg; /* Argument to runnable function. */ enum thread_priority pri; /* Priority to run thread with. */ thread_return cb; /* Callback function. */ void *ctx; /* Callback function context. */ unsigned timeout; /* Maximum timeout for operations. */ void *ret; /* Job return value. */ time_t inserted; struct thread_pool *pool; TAILQ_ENTRY(thread_job) tqe; }; /* struct thread_job */ struct thread { pthread_t tid; struct thread_pool *pool; int attached; struct thread_job *job; TAILQ_ENTRY(thread) tqe; LIST_ENTRY(thread) le; }; /* struct thread */ static const struct thread_pool { struct thread_options opts; struct event_base *ev_base; int closed; struct { TAILQ_HEAD(, thread_job) free; TAILQ_HEAD(, thread_job) queue; TAILQ_HEAD(, thread_job) drop; } jobs; struct { pthread_mutex_t mutex; pthread_cond_t gate; struct event ev; int ev_pending; int pipe[2]; TAILQ_HEAD(, thread) list; unsigned int nthreads; LIST_HEAD(, thread) queue; } threads; } thread_pool_initializer = { .threads = { .mutex = PTHREAD_MUTEX_INITIALIZER, .gate = PTHREAD_COND_INITIALIZER, .pipe = { -1, -1 }, }, }; struct thread_options thread_defaults = { .nthreads_min = 1, .nthreads_max = 3, }; /* thread_defaults */ static pthread_key_t pool_key; #define THREAD_POOL thread_pool_get() static struct thread_pool *thread_pool_get(void) { struct thread_pool *tp; assert((tp = pthread_getspecific(pool_key))); return tp; } /* thread_pool_get() */ static void *thread_loop(void *t_) { struct thread *t = t_; int locked = 0; int num; while (t->attached) { if (0 != pthread_mutex_lock(&t->pool->threads.mutex)) goto sysfail; locked = 1; /* * Wait in line for another job. */ while (t->attached && !(t->job = TAILQ_FIRST(&t->pool->jobs.queue))) { LIST_INSERT_HEAD(&t->pool->threads.queue, t, le); if (0 != pthread_cond_wait(&t->pool->threads.gate, &t->pool->threads.mutex)) goto sysfail; LIST_REMOVE(t, le); } if (t->job) TAILQ_REMOVE(&t->pool->jobs.queue, t->job, tqe); assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex)); locked = 0; if (t->job) { t->job->ret = t->job->run(t->job->arg, t->job->ctx, t->job->timeout); assert(0 == pthread_mutex_lock(&t->pool->threads.mutex)); locked = 1; TAILQ_INSERT_TAIL(&t->pool->jobs.drop, t->job, tqe); assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex)); locked = 0; do { num = write(t->pool->threads.pipe[1], "", 1); } while(num == -1 && errno == EINTR); } } assert(0 == pthread_mutex_lock(&t->pool->threads.mutex)); TAILQ_REMOVE(&t->pool->threads.list, t, tqe); t->pool->threads.nthreads--; assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex)); free(t); return (void *)(intptr_t)0; sysfail: if (!locked) assert(0 == pthread_mutex_lock(&t->pool->threads.mutex)); TAILQ_REMOVE(&t->pool->threads.list, t, tqe); t->pool->threads.nthreads--; assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex)); free(t); return (void *)(intptr_t)errno; } /* thread_loop() */ static int thread_start(struct thread_pool *tp) { struct thread *t = NULL; sigset_t saved_set; int sys_errno; t = malloc(sizeof *t); if (!t) goto sysfail; t->pool = tp; t->attached = 1; t->job = NULL; if (0 != pthread_sigmask(tp->opts.sigmask_how, &tp->opts.sigmask_set, &saved_set)) goto sysfail; if (0 != pthread_create(&t->tid, 0, &thread_loop, t)) { assert(0 == pthread_sigmask(SIG_SETMASK, &saved_set, 0)); goto sysfail; } assert(0 == pthread_sigmask(SIG_SETMASK, &saved_set, 0)); /* * This critical section protected from thread_job_dispatch() or * thread_start_all(). */ assert(0 != pthread_mutex_trylock(&tp->threads.mutex)); TAILQ_INSERT_TAIL(&t->pool->threads.list, t, tqe); t->pool->threads.nthreads++; return 0; sysfail: sys_errno = errno; free(t); errno = sys_errno; return -1; } /* thread_start() */ static int thread_start_all(struct thread_pool *tp) { int okay; if (0 != pthread_mutex_lock(&tp->threads.mutex)) return -1; while (tp->threads.nthreads < tp->opts.nthreads_max) { okay = (0 == thread_start(tp)); /* tp->threads.nthreads incremented by thread_start() */ assert(0 == pthread_mutex_unlock(&tp->threads.mutex)); if (!okay) return 0; pthread_yield(); if (0 != pthread_mutex_lock(&tp->threads.mutex)) return -1; } assert(0 == pthread_mutex_unlock(&tp->threads.mutex)); return 0; } /* thread_start_all() */ static struct thread_job *thread_job_open(struct thread_pool *tp, thread_enter func, void *arg, enum thread_priority pri, thread_return cb, void *ctx, unsigned timeout) { struct thread_job *j; j = TAILQ_FIRST(&tp->jobs.free); if (!j) { j = malloc(sizeof *j); if (!j) return NULL; } else TAILQ_REMOVE(&tp->jobs.free, j, tqe); j->pool = tp; j->run = func; j->arg = arg; j->pri = pri; j->cb = cb; j->ctx = ctx; j->timeout = timeout; j->ret = NULL; return j; } /* thread_job_open() */ static void thread_job_close(struct thread_pool *tp, struct thread_job *j, enum thread_errno err) { thread_return cb = j->cb; void *cb_ret = j->ret; void *cb_ctx = j->ctx; TAILQ_INSERT_HEAD(&tp->jobs.free, j, tqe); cb(cb_ret, err, cb_ctx); return /* void */; } /* thread_job_close() */ static void thread_job_catch(int fd, short event, void *ev_arg) { struct thread_pool *tp = THREAD_POOL; TAILQ_HEAD(, thread_job) jobs = TAILQ_HEAD_INITIALIZER(jobs); char buf[32]; int num; struct thread_job *j, *x; /* Make sure all our assumptions are correct. */ assert(ev_arg == tp && "libevent loop must run from same thread as thread_init()."); do { num = read(fd, buf, sizeof buf); } while (num > 0 || (num == -1 && errno == EINTR)); if (num == -1 && errno != EAGAIN) goto sysfail; if (0 != pthread_mutex_lock(&tp->threads.mutex)) goto sysfail; for (j = TAILQ_FIRST(&tp->jobs.drop); j; j = x) { x = TAILQ_NEXT(j, tqe); TAILQ_REMOVE(&tp->jobs.drop, j, tqe); TAILQ_INSERT_TAIL(&jobs, j, tqe); } assert(0 == pthread_mutex_unlock(&tp->threads.mutex)); /* * We must issue the callbacks outside of our critical section to * avoid any deadlocks if the caller recurses (i.e. from the * callback calls thread_run() again, which will attempt to take the * thread-pool lock). */ for (j = TAILQ_FIRST(&jobs); j; j = x) { x = TAILQ_NEXT(j, tqe); TAILQ_REMOVE(&jobs, j, tqe); thread_job_close(tp, j, THREAD_ESUCCESS); } return /* void */; sysfail: assert(0 && "Cannot recover from thread_job_catch() failure."); return /* void */; } /* thread_job_catch() */ static void thread_job_dispatch(struct thread_pool *tp, struct thread_job *j) { int locked = 0; if (0 != pthread_mutex_lock(&tp->threads.mutex)) goto sysfail; locked = 1; /* * TODO: Use priority. */ TAILQ_INSERT_TAIL(&tp->jobs.queue, j, tqe); j->inserted = time(NULL); if (LIST_FIRST(&tp->threads.queue)) { if (0 != pthread_cond_signal(&tp->threads.gate)) { TAILQ_REMOVE(&tp->jobs.queue, j, tqe); goto sysfail; } } else if (tp->threads.nthreads < tp->opts.nthreads_max && !tp->opts.nthreads_fixed) { if (0 != thread_start(tp)) { TAILQ_REMOVE(&tp->jobs.queue, j, tqe); goto sysfail; } } assert(0 == pthread_mutex_unlock(&tp->threads.mutex)); pthread_yield(); return /* void */; sysfail: if (locked) assert(0 == pthread_mutex_unlock(&tp->threads.mutex)); thread_job_close(tp, j, THREAD_ESYSTEM); return /* void */; } /* thread_job_dispatch() */ void thread_run(thread_enter func, void *arg, enum thread_priority pri, thread_return cb, void *ctx, unsigned timeout) { struct thread_pool *tp = THREAD_POOL; struct thread_job *j = thread_job_open(tp, func, arg, pri, cb, ctx, timeout); if (!j) goto sysfail; if (tp->closed) { errno = EAGAIN; goto sysfail; } thread_job_dispatch(tp, j); return /* void */; sysfail: if (j) { cb(NULL, THREAD_ESYSTEM, ctx); } else thread_job_close(tp, j, THREAD_ESYSTEM); return /* void */; } /* thread_run() */ const char *thread_strerror(enum thread_errno err) { switch (err) { case THREAD_ESUCCESS: return "Success"; case THREAD_ESYSTEM: return strerror(errno); case THREAD_ETIMEDOUT: return "Operation timed out"; default: return "Unknown"; } } /* thread_strerror() */ static void thread_pool_close(struct thread_pool *tp) { struct thread *t; unsigned int nthreads; struct thread_job *j; if (!tp) return /* void */; tp->closed = 1; assert(0 == pthread_mutex_lock(&tp->threads.mutex)); nthreads = tp->threads.nthreads; TAILQ_FOREACH(t, &tp->threads.list, tqe) { t->attached = 0; } assert(0 == pthread_mutex_unlock(&tp->threads.mutex)); /* Spin trying to catch all our threads. */ while (nthreads > 0) { if (tp->ev_base) event_base_loop(tp->ev_base, EVLOOP_NONBLOCK); else event_loop(EVLOOP_NONBLOCK); assert(0 == pthread_cond_broadcast(&tp->threads.gate)); pthread_yield(); assert(0 == pthread_mutex_lock(&tp->threads.mutex)); nthreads = tp->threads.nthreads; assert(0 == pthread_mutex_unlock(&tp->threads.mutex)); } assert(0 == pthread_mutex_destroy(&tp->threads.mutex)); assert(0 == pthread_cond_destroy(&tp->threads.gate)); assert(TAILQ_EMPTY(&tp->threads.list)); while (!TAILQ_EMPTY(&tp->jobs.drop)) { thread_job_catch(tp->threads.pipe[0], 0, tp); } while ((j = TAILQ_FIRST(&tp->jobs.queue))) { TAILQ_REMOVE(&tp->jobs.queue, j, tqe); errno = EAGAIN; thread_job_close(tp, j, THREAD_ESYSTEM); } while ((j = TAILQ_FIRST(&tp->jobs.free))) { TAILQ_REMOVE(&tp->jobs.free, j, tqe); free(j); } if (tp->threads.ev_pending) (void)event_del(&tp->threads.ev); (void)close(tp->threads.pipe[0]); (void)close(tp->threads.pipe[1]); memset(tp, '\0', sizeof *tp); free(tp); return /* void */; } /* thread_pool_close() */ static void thread_pool_destructor(void *tp) { thread_pool_close(tp); } /* thread_pool_destructor() */ static struct thread_pool *thread_pool_open(const struct thread_options *opts, struct event_base *ev_base) { struct thread_pool *tp = 0; int flags; if (!opts) opts = &thread_defaults; if (!(tp = malloc(sizeof *tp))) goto sysfail; *tp = thread_pool_initializer; tp->opts = *opts; tp->ev_base = ev_base; TAILQ_INIT(&tp->jobs.free); TAILQ_INIT(&tp->jobs.queue); TAILQ_INIT(&tp->jobs.drop); TAILQ_INIT(&tp->threads.list); LIST_INIT(&tp->threads.queue); if (0 != pipe(tp->threads.pipe) || -1 == (flags = fcntl(tp->threads.pipe[0], F_GETFL)) || 0 != fcntl(tp->threads.pipe[0], F_SETFL, flags | O_NONBLOCK) || -1 == (flags = fcntl(tp->threads.pipe[1], F_GETFL)) || 0 != fcntl(tp->threads.pipe[1], F_SETFL, flags | O_NONBLOCK)) { goto sysfail; } event_set(&tp->threads.ev, tp->threads.pipe[0], EV_READ | EV_PERSIST, &thread_job_catch, tp); if (ev_base) event_base_set(ev_base, &tp->threads.ev); if (0 != event_add(&tp->threads.ev, 0)) goto sysfail; tp->threads.ev_pending = 1; return tp; sysfail: thread_pool_close(tp); return 0; } /* thread_pool_open() */ enum thread_errno thread_reset(const struct thread_options *opts, struct event_base *ev_base) { thread_pool_close(THREAD_POOL); return thread_init(opts, ev_base); } /* thread_reset() */ static int init_once_okay; static int init_once_errno; static void thread_init_once(void) { if (0 != pthread_key_create(&pool_key, &thread_pool_destructor)) { init_once_errno = errno; return /* void */; } /* * SIG_BLOCK with an empty set should affect a null operation. */ thread_defaults.sigmask_how = SIG_BLOCK; (void)sigemptyset(&thread_defaults.sigmask_set); init_once_okay = 1; return /* void */; } /* thread_init_once() */ enum thread_errno thread_init(const struct thread_options *opts, struct event_base *ev_base) { static pthread_once_t init_once = PTHREAD_ONCE_INIT; struct thread_pool *tp = 0; int sys_errno; if (0 != pthread_once(&init_once, &thread_init_once)) return THREAD_ESYSTEM; if (!init_once_okay) { errno = init_once_errno; goto sysfail; } if (!(tp = thread_pool_open(opts, ev_base))) goto sysfail; if (0 != pthread_setspecific(pool_key, tp)) goto sysfail; if (tp->opts.nthreads_fixed && 0 != thread_start_all(tp)) goto sysfail; return 0; sysfail: sys_errno = errno; if (tp != 0) thread_pool_close(tp); errno = sys_errno; return THREAD_ESYSTEM; } /* thread_init() */ #else /* elif !defined USE_PTHREADS */ struct event_base; #include /* size_t */ #include /* struct timeval */ #include "thread.h" void thread_run(thread_enter a, void *b, enum thread_priority c, thread_return cb, void *arg, unsigned d) { cb(0, THREAD_ESYSTEM, arg); return /* void */; } /* thread_run() */ const char *thread_strerror(enum thread_errno e) { return "Unknown"; } /* thread_strerror() */ enum thread_errno thread_reset(const struct thread_options *opts, struct event_base *ev_base) { return THREAD_ESYSTEM; } /* thread_reset() */ enum thread_errno thread_init(const struct thread_options *opts, struct event_base *ev_base) { return THREAD_ESYSTEM; } /* thread_init() */ #endif /* USE_PTHREADS */