/* ==========================================================================
 * libevnet/src/thread.c - Network server library for libevent.
 * --------------------------------------------------------------------------
 * Copyright (c) 2006  William Ahern
 * Copyright (c) 2005  Barracuda Networks, Inc.
 * Copyright (c) 2002  William Ahern (Originally under GPL from AnonNet.)
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to permit
 * persons to whom the Software is furnished to do so, subject to the
 * following conditions:
 *
 * The above copyright notice and this permission notice shall be included
 * in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
 * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
 * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 * USE OR OTHER DEALINGS IN THE SOFTWARE.
 * ==========================================================================
 */
#ifdef USE_PTHREADS
#define _REENTRANT	/* pthreads(3) */
#define _GNU_SOURCE	/* pthread_yield(3) */

#include <stdlib.h>	/* malloc(3) */

#include <inttypes.h>	/* intptr_t */

#include <time.h>	/* time_t */

#include <signal.h>	/* SIG_BLOCK SIG_SETMASK sigset_t sigemptyset(3) */

#include <string.h>	/* strerror(3) */

#include <errno.h>

#include <fcntl.h>	/* F_GETFL F_SETFL O_NONBLOCK fcntl(2) */

#include <unistd.h>	/* pipe(2) */

#include <sys/queue.h>	/* LIST TAILQ */

#include <sys/time.h>	/* event(3) */

#include <pthread.h>	/* PTHREAD_MUTEX_INITIALIZER pthread_mutex_lock(3)
			 * pthread_mutex_unlock(3) pthread_yield(3)
			 * pthread_cond_wait(3) pthread_cond_wait(3)
			 * pthread_create(3) pthread_once(3)
			 * pthread_key_create(3) pthread_getspecific(3)
			 * pthread_setspecific(3) pthread_sigmask(3)
			 */

#include <assert.h>

#include <sys/types.h>	/* u_char */

#include <event.h>	/* struct event event_set(3) event_add(3) */

#include "thread.h"


struct thread_job;	/* Threaded job. */

struct thread_pool;	/* Collection of threads managed from an event loop. */

struct thread;		/* Thread which runs jobs. */


struct thread_job {
	thread_enter run;		/* Function to run in thread. */
	void *arg;			/* Argument to runnable function. */
	enum thread_priority pri;	/* Priority to run thread with. */
	thread_return cb;		/* Callback function. */
	void *ctx;			/* Callback function context. */
	unsigned timeout;		/* Maximum timeout for operations. */

	void *ret;			/* Job return value. */

	time_t inserted;

	struct thread_pool *pool;

	TAILQ_ENTRY(thread_job) tqe;
}; /* struct thread_job */


struct thread {
	pthread_t tid;

	struct thread_pool *pool;

	int attached;

	struct thread_job *job;

	TAILQ_ENTRY(thread) tqe;
	LIST_ENTRY(thread) le;
}; /* struct thread */


static const struct thread_pool {
	struct thread_options opts;

	struct event_base *ev_base;

	int closed;

	struct {
		TAILQ_HEAD(, thread_job) free;
		TAILQ_HEAD(, thread_job) queue;
		TAILQ_HEAD(, thread_job) drop;
	} jobs;

	struct {
		pthread_mutex_t mutex;
		pthread_cond_t gate;

		struct event ev;
		int ev_pending;

		int pipe[2];

		TAILQ_HEAD(, thread) list;
		unsigned int nthreads;

		LIST_HEAD(, thread) queue;
	} threads;
} thread_pool_initializer = {
	.threads	= { .mutex	= PTHREAD_MUTEX_INITIALIZER,
			    .gate	= PTHREAD_COND_INITIALIZER,
			    .pipe 	= { -1, -1 }, },
};


struct thread_options thread_defaults = {
	.nthreads_min	= 1,
	.nthreads_max	= 3,
}; /* thread_defaults */


static pthread_key_t pool_key;


#define THREAD_POOL	thread_pool_get()

static struct thread_pool *thread_pool_get(void) {
	struct thread_pool *tp;

	assert((tp = pthread_getspecific(pool_key)));

	return tp;
} /* thread_pool_get() */


static void *thread_loop(void *t_) {
	struct thread *t	= t_;
	int locked		= 0;
	int num;

	while (t->attached) {
		if (0 != pthread_mutex_lock(&t->pool->threads.mutex))
			goto sysfail;

		locked	= 1;

		/*
		 * Wait in line for another job.
		 */
		while (t->attached && !(t->job = TAILQ_FIRST(&t->pool->jobs.queue))) {
			LIST_INSERT_HEAD(&t->pool->threads.queue, t, le);

			if (0 != pthread_cond_wait(&t->pool->threads.gate, &t->pool->threads.mutex))
				goto sysfail;

			LIST_REMOVE(t, le);
		}

		if (t->job)
			TAILQ_REMOVE(&t->pool->jobs.queue, t->job, tqe);
		
		assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex));

		locked	= 0;

		if (t->job) {
			t->job->ret	= t->job->run(t->job->arg, t->job->ctx, t->job->timeout);

			assert(0 == pthread_mutex_lock(&t->pool->threads.mutex));

			locked	= 1;

			TAILQ_INSERT_TAIL(&t->pool->jobs.drop, t->job, tqe);

			assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex));

			locked	= 0;

			do {
				num	= write(t->pool->threads.pipe[1], "", 1);
			} while(num == -1 && errno == EINTR);
		}
	}

	assert(0 == pthread_mutex_lock(&t->pool->threads.mutex));

	TAILQ_REMOVE(&t->pool->threads.list, t, tqe);
	t->pool->threads.nthreads--;

	assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex));

	free(t);

	return (void *)(intptr_t)0;
sysfail:
	if (!locked)
		assert(0 == pthread_mutex_lock(&t->pool->threads.mutex));

	TAILQ_REMOVE(&t->pool->threads.list, t, tqe);
	t->pool->threads.nthreads--;

	assert(0 == pthread_mutex_unlock(&t->pool->threads.mutex));

	free(t);

	return (void *)(intptr_t)errno;
} /* thread_loop() */


static int thread_start(struct thread_pool *tp) {
	struct thread *t	= NULL;
	sigset_t saved_set;
	int sys_errno;

	t	= malloc(sizeof *t);

	if (!t)
		goto sysfail;

	t->pool		= tp;
	t->attached	= 1;
	t->job		= NULL;

	if (0 != pthread_sigmask(tp->opts.sigmask_how, &tp->opts.sigmask_set, &saved_set))
		goto sysfail;

	if (0 != pthread_create(&t->tid, 0, &thread_loop, t)) {
		assert(0 == pthread_sigmask(SIG_SETMASK, &saved_set, 0));

		goto sysfail;
	}

	assert(0 == pthread_sigmask(SIG_SETMASK, &saved_set, 0));

	/*
	 * This critical section protected from thread_job_dispatch() or
	 * thread_start_all().
	 */
	assert(0 != pthread_mutex_trylock(&tp->threads.mutex));

	TAILQ_INSERT_TAIL(&t->pool->threads.list, t, tqe);
	t->pool->threads.nthreads++;

	return 0;
sysfail:
	sys_errno	= errno;

	free(t);

	errno		= sys_errno;

	return -1;
} /* thread_start() */


static int thread_start_all(struct thread_pool *tp) {
	int okay;

	if (0 != pthread_mutex_lock(&tp->threads.mutex))
		return -1;

	while (tp->threads.nthreads < tp->opts.nthreads_max) {
		okay	= (0 == thread_start(tp));

		/* tp->threads.nthreads incremented by thread_start() */

		assert(0 == pthread_mutex_unlock(&tp->threads.mutex));

		if (!okay)
			return 0;

		pthread_yield();

		if (0 != pthread_mutex_lock(&tp->threads.mutex))
			return -1;
	}

	assert(0 == pthread_mutex_unlock(&tp->threads.mutex));

	return 0;
} /* thread_start_all() */


static struct thread_job *thread_job_open(struct thread_pool *tp, thread_enter func, void *arg, enum thread_priority pri, thread_return cb, void *ctx, unsigned timeout) {
	struct thread_job *j;
	
	j	= TAILQ_FIRST(&tp->jobs.free);

	if (!j) {
		j	= malloc(sizeof *j);

		if (!j)
			return NULL;
	} else
		TAILQ_REMOVE(&tp->jobs.free, j, tqe);

	j->pool		= tp;
	j->run		= func;
	j->arg		= arg;
	j->pri		= pri;
	j->cb		= cb;
	j->ctx		= ctx;
	j->timeout	= timeout;

	j->ret		= NULL;

	return j;
} /* thread_job_open() */


static void thread_job_close(struct thread_pool *tp, struct thread_job *j, enum thread_errno err) {
	thread_return cb	= j->cb;
	void *cb_ret		= j->ret;
	void *cb_ctx		= j->ctx;

	TAILQ_INSERT_HEAD(&tp->jobs.free, j, tqe);

	cb(cb_ret, err, cb_ctx);

	return /* void */;
} /* thread_job_close() */


static void thread_job_catch(int fd, short event, void *ev_arg) {
	struct thread_pool *tp		= THREAD_POOL;
	TAILQ_HEAD(, thread_job) jobs	= TAILQ_HEAD_INITIALIZER(jobs);
	char buf[32];
	int num;
	struct thread_job *j, *x;

	/* Make sure all our assumptions are correct. */
	assert(ev_arg == tp && "libevent loop must run from same thread as thread_init().");

	do {
		num	= read(fd, buf, sizeof buf);
	} while (num > 0 || (num == -1 && errno == EINTR));

	if (num == -1 && errno != EAGAIN)
		goto sysfail;

	if (0 != pthread_mutex_lock(&tp->threads.mutex))
		goto sysfail;

	for (j = TAILQ_FIRST(&tp->jobs.drop); j; j = x) {
		x	= TAILQ_NEXT(j, tqe);

		TAILQ_REMOVE(&tp->jobs.drop, j, tqe);

		TAILQ_INSERT_TAIL(&jobs, j, tqe);
	}

	assert(0 == pthread_mutex_unlock(&tp->threads.mutex));

	/*
	 * We must issue the callbacks outside of our critical section to
	 * avoid any deadlocks if the caller recurses (i.e. from the
	 * callback calls thread_run() again, which will attempt to take the
	 * thread-pool lock).
	 */
	for (j = TAILQ_FIRST(&jobs); j; j = x) {
		x	= TAILQ_NEXT(j, tqe);

		TAILQ_REMOVE(&jobs, j, tqe);

		thread_job_close(tp, j, THREAD_ESUCCESS);
	}

	return /* void */;
sysfail:
	assert(0 && "Cannot recover from thread_job_catch() failure.");

	return /* void */;
} /* thread_job_catch() */


static void thread_job_dispatch(struct thread_pool *tp, struct thread_job *j) {
	int locked	= 0;

	if (0 != pthread_mutex_lock(&tp->threads.mutex))
		goto sysfail;

	locked	= 1;

	/*
	 * TODO: Use priority.
	 */

	TAILQ_INSERT_TAIL(&tp->jobs.queue, j, tqe);

	j->inserted	= time(NULL);

	if (LIST_FIRST(&tp->threads.queue)) {
		if (0 != pthread_cond_signal(&tp->threads.gate)) {
			TAILQ_REMOVE(&tp->jobs.queue, j, tqe);

			goto sysfail;
		}
	} else if (tp->threads.nthreads < tp->opts.nthreads_max && !tp->opts.nthreads_fixed) {
		if (0 != thread_start(tp)) {
			TAILQ_REMOVE(&tp->jobs.queue, j, tqe);

			goto sysfail;
		}
	}

	assert(0 == pthread_mutex_unlock(&tp->threads.mutex));

	pthread_yield();

	return /* void */;
sysfail:
	if (locked)
		assert(0 == pthread_mutex_unlock(&tp->threads.mutex));

	thread_job_close(tp, j, THREAD_ESYSTEM);

	return /* void */;
} /* thread_job_dispatch() */


void thread_run(thread_enter func, void *arg, enum thread_priority pri, thread_return cb, void *ctx, unsigned timeout) {
	struct thread_pool *tp	= THREAD_POOL;
	struct thread_job *j	= thread_job_open(tp, func, arg, pri, cb, ctx, timeout);

	if (!j)
		goto sysfail;

	if (tp->closed) {
		errno	= EAGAIN;

		goto sysfail;
	}

	thread_job_dispatch(tp, j);

	return /* void */;
sysfail:
	if (j) {
		cb(NULL, THREAD_ESYSTEM, ctx);	
	} else
		thread_job_close(tp, j, THREAD_ESYSTEM);

	return /* void */;
} /* thread_run() */


const char *thread_strerror(enum thread_errno err) {
	switch (err) {
	case THREAD_ESUCCESS:
		return "Success";
	case THREAD_ESYSTEM:
		return strerror(errno);
	case THREAD_ETIMEDOUT:
		return "Operation timed out";
	default:
		return "Unknown";	
	}
} /* thread_strerror() */


static void thread_pool_close(struct thread_pool *tp) {
	struct thread *t;
	unsigned int nthreads;
	struct thread_job *j;

	if (!tp)
		return /* void */;

	tp->closed	= 1;

	assert(0 == pthread_mutex_lock(&tp->threads.mutex));

	nthreads	= tp->threads.nthreads;

	TAILQ_FOREACH(t, &tp->threads.list, tqe) {
		t->attached	= 0;
	}

	assert(0 == pthread_mutex_unlock(&tp->threads.mutex));

	/* Spin trying to catch all our threads. */
	while (nthreads > 0) {
		if (tp->ev_base)
			event_base_loop(tp->ev_base, EVLOOP_NONBLOCK);
		else
			event_loop(EVLOOP_NONBLOCK);

		assert(0 == pthread_cond_broadcast(&tp->threads.gate));

		pthread_yield();

		assert(0 == pthread_mutex_lock(&tp->threads.mutex));

		nthreads	= tp->threads.nthreads;

		assert(0 == pthread_mutex_unlock(&tp->threads.mutex));
	}

	assert(0 == pthread_mutex_destroy(&tp->threads.mutex));
	assert(0 == pthread_cond_destroy(&tp->threads.gate));

	assert(TAILQ_EMPTY(&tp->threads.list));

	while (!TAILQ_EMPTY(&tp->jobs.drop)) {
		thread_job_catch(tp->threads.pipe[0], 0, tp);
	}

	while ((j = TAILQ_FIRST(&tp->jobs.queue))) {
		TAILQ_REMOVE(&tp->jobs.queue, j, tqe);

		errno	= EAGAIN;

		thread_job_close(tp, j, THREAD_ESYSTEM);
	}

	while ((j = TAILQ_FIRST(&tp->jobs.free))) {
		TAILQ_REMOVE(&tp->jobs.free, j, tqe);

		free(j);
	}

	if (tp->threads.ev_pending)
		(void)event_del(&tp->threads.ev);

	(void)close(tp->threads.pipe[0]);
	(void)close(tp->threads.pipe[1]);

	memset(tp, '\0', sizeof *tp);

	free(tp);

	return /* void */;
} /* thread_pool_close() */

static void thread_pool_destructor(void *tp) {
	thread_pool_close(tp);
} /* thread_pool_destructor() */


static struct thread_pool *thread_pool_open(const struct thread_options *opts, struct event_base *ev_base) {
	struct thread_pool *tp	= 0;
	int flags;

	if (!opts)
		opts	= &thread_defaults;

	if (!(tp = malloc(sizeof *tp)))
		goto sysfail;

	*tp		= thread_pool_initializer;
	tp->opts	= *opts;
	tp->ev_base	= ev_base;

	TAILQ_INIT(&tp->jobs.free);
	TAILQ_INIT(&tp->jobs.queue);
	TAILQ_INIT(&tp->jobs.drop);

	TAILQ_INIT(&tp->threads.list);

	LIST_INIT(&tp->threads.queue);

	if (0 != pipe(tp->threads.pipe)
	|| -1 == (flags = fcntl(tp->threads.pipe[0], F_GETFL))
	||  0 != fcntl(tp->threads.pipe[0], F_SETFL, flags | O_NONBLOCK)
	|| -1 == (flags = fcntl(tp->threads.pipe[1], F_GETFL))
	||  0 != fcntl(tp->threads.pipe[1], F_SETFL, flags | O_NONBLOCK)) {
		goto sysfail;
	}

	event_set(&tp->threads.ev, tp->threads.pipe[0], EV_READ | EV_PERSIST, &thread_job_catch, tp);

	if (ev_base)
		event_base_set(ev_base, &tp->threads.ev);

	if (0 != event_add(&tp->threads.ev, 0))
		goto sysfail;

	tp->threads.ev_pending	= 1;

	return tp;
sysfail:
	thread_pool_close(tp);

	return 0;
} /* thread_pool_open() */


enum thread_errno thread_reset(const struct thread_options *opts, struct event_base *ev_base) {
	thread_pool_close(THREAD_POOL);

	return thread_init(opts, ev_base);
} /* thread_reset() */


static int init_once_okay;
static int init_once_errno;

static void thread_init_once(void) {
	if (0 != pthread_key_create(&pool_key, &thread_pool_destructor)) {
		init_once_errno	= errno;

		return /* void */;
	}

	/*
	 * SIG_BLOCK with an empty set should affect a null operation.
	 */
	thread_defaults.sigmask_how	= SIG_BLOCK;
	(void)sigemptyset(&thread_defaults.sigmask_set);

	init_once_okay	= 1;

	return /* void */;
} /* thread_init_once() */


enum thread_errno thread_init(const struct thread_options *opts, struct event_base *ev_base) {
	static pthread_once_t init_once	= PTHREAD_ONCE_INIT;
	struct thread_pool *tp		= 0;
	int sys_errno;

	if (0 != pthread_once(&init_once, &thread_init_once))
		return THREAD_ESYSTEM;

	if (!init_once_okay) {
		errno	= init_once_errno;

		goto sysfail;
	}

	if (!(tp = thread_pool_open(opts, ev_base)))
		goto sysfail;

	if (0 != pthread_setspecific(pool_key, tp))
		goto sysfail;

	if (tp->opts.nthreads_fixed && 0 != thread_start_all(tp))
		goto sysfail;

	return 0;
sysfail:
	sys_errno	= errno;

	if (tp != 0)
		thread_pool_close(tp);

	errno		= sys_errno;

	return THREAD_ESYSTEM;
} /* thread_init() */

#else /* elif !defined USE_PTHREADS */


struct event_base;

#include <stddef.h>	/* size_t */

#include <sys/time.h>	/* struct timeval */

#include "thread.h"


void thread_run(thread_enter a, void *b, enum thread_priority c, thread_return cb, void *arg, unsigned d) {
	cb(0, THREAD_ESYSTEM, arg);

	return /* void */;
} /* thread_run() */


const char *thread_strerror(enum thread_errno e) {
	return "Unknown";
} /* thread_strerror() */


enum thread_errno thread_reset(const struct thread_options *opts, struct event_base *ev_base) {
	return THREAD_ESYSTEM;
} /* thread_reset() */


enum thread_errno thread_init(const struct thread_options *opts, struct event_base *ev_base) {
	return THREAD_ESYSTEM;
} /* thread_init() */


#endif /* USE_PTHREADS */



syntax highlighted by Code2HTML, v. 0.9.1