/* ==========================================================================
 * libevnet/src/bufio.c - Network server library for libevent.
 * --------------------------------------------------------------------------
 * Copyright (c) 2006  Barracuda Networks, Inc.
 * Copyright (c) 2006  William Ahern
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to permit
 * persons to whom the Software is furnished to do so, subject to the
 * following conditions:
 *
 * The above copyright notice and this permission notice shall be included
 * in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
 * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
 * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 * USE OR OTHER DEALINGS IN THE SOFTWARE.
 * ==========================================================================
 */
#include <errno.h>		/* ENOTSUP */

#include <assert.h>

#include <string.h>		/* memset(3) */

#include <windows.h>		/* GetLastError SetLastError */

#include <sys/time.h>		/* struct timeval timerclear gettimeofday(3) */
#include <sys/param.h>		/* MIN */

#include <sys/queue.h>		/* SLIST */

#include <arena/proto.h>

#include "bufio.h"
#include "bufio/pagebuf.h"
#include "bufio/membuf.h"

#if 0
#include <stdio.h>
#define MARK (fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__))
#endif


const char *bufio_errlist[BUFIO_NERR] = {
	[0 ... (BUFIO_NERR - 1)]	= "Unknown",
	[BUFIO_ESUCCESS]		= "Success",
	[BUFIO_ESYSTEM]			= "System error",
	[BUFIO_ECANCELLED]		= "Operation cancelled",
	[BUFIO_EEOF]			= "End of file",
	[BUFIO_ETIMEDOUT]		= "Operation timed out",
	[BUFIO_ENOTPOLLING]		= "Nothing to cancel",
	[BUFIO_EAGAIN]			= "Try operation again",
	[BUFIO_EPIPE]			= "Sink disconnected",
}; /* bufio_errlist[] */

const size_t bufio_nerr		= sizeof bufio_errlist / sizeof *bufio_errlist;


static size_t bufio_source_copyto_noop(struct bufio_source *source, struct bufio_sink *sink, int flags, enum bufio_errno *e) {
	*e	= BUFIO_ESYSTEM;

	SetLastError(ENOTSUP);

	return 0;
} /* bufio_source_copyto_noop() */


static size_t bufio_source_copyout_noop(struct bufio_source *source, void *buf, size_t bufsiz, int flags, enum bufio_errno *e) {
	*e	= BUFIO_ESYSTEM;

	SetLastError(ENOTSUP);

	return 0;
} /* bufio_source_copyout_noop() */


static size_t bufio_source_buffered(struct bufio_source *src) {
	return 0;
} /* bufio_source_buffered() */


static enum bufio_errno bufio_source_poll_noop(struct bufio_source *source, void (*cb)(struct bufio_source *, enum bufio_errno, void *), void *arg, struct timeval *timeout) {
	SetLastError(ENOTSUP);

	return BUFIO_ESYSTEM;
} /* bufio_source_poll_noop() */


static enum bufio_errno bufio_source_cancel_noop(struct bufio_source *source, int notify) {
	return 0;
} /* bufio_source_cancel_noop() */


static enum bufio_errno bufio_source_lasterr_noop(struct bufio_source *source) {
	return 0;
} /* bufio_source_lasterr_noop() */


static void bufio_source_clearerr_noop(struct bufio_source *source) {
	return /* void */;
} /* bufio_source_clearerr_noop() */


struct bufio_source bufio_source_initializer = {
#ifndef WIN32
	.copyto		= &bufio_source_copyto_noop,
	.copyout	= &bufio_source_copyout_noop,
	.buffered	= &bufio_source_buffered,
	.poll		= &bufio_source_poll_noop,
	.cancel		= &bufio_source_cancel_noop,
	.lasterr	= &bufio_source_lasterr_noop,
	.clearerr	= &bufio_source_clearerr_noop,
#else
	&bufio_source_copyto_noop,
	&bufio_source_copyout_noop,
	&bufio_source_buffered,
	&bufio_source_poll_noop,
	&bufio_source_cancel_noop,
	&bufio_source_lasterr_noop,
	&bufio_source_clearerr_noop,
#endif
}; /* bufio_source_initializer */


static size_t bufio_sink_copyfrom_noop(struct bufio_sink *snk, struct bufio_source *src, int flags, enum bufio_errno *e) {
	*e	= BUFIO_ESYSTEM;

	SetLastError(ENOTSUP);

	return 0;
} /* bufio_sink_copyfrom_noop() */


static size_t bufio_sink_copyin_noop(struct bufio_sink *snk, void *buf, size_t bufsiz, int flags, enum bufio_errno *e) {
	*e	= BUFIO_ESYSTEM;

	SetLastError(ENOTSUP);

	return 0;
} /* bufio_sink_copyin_noop() */


static size_t bufio_sink_buffered(struct bufio_sink *snk) {
	return 0;
} /* bufio_sink_buffered() */


static enum bufio_errno bufio_sink_poll_noop(struct bufio_sink *sink, void (*cb)(struct bufio_sink *, enum bufio_errno, void *), void *arg, struct timeval *timeout) {
	SetLastError(ENOTSUP);

	return BUFIO_ESYSTEM;
} /* bufio_sink_poll_noop() */


static enum bufio_errno bufio_sink_cancel_noop(struct bufio_sink *sink, int notify) {
	return 0;
} /* bufio_sink_cancel_noop() */


static enum bufio_errno bufio_sink_lasterr_noop(struct bufio_sink *sink) {
	return 0;
} /* bufio_sink_lasterr_noop() */


static void bufio_sink_clearerr_noop(struct bufio_sink *sink) {
	return /* void */;
} /* bufio_sink_clearerr_noop() */


struct bufio_sink bufio_sink_initializer = {
#ifndef WIN32
	.copyfrom	= &bufio_sink_copyfrom_noop,
	.copyin		= &bufio_sink_copyin_noop,
	.buffered	= &bufio_sink_buffered,
	.poll		= &bufio_sink_poll_noop,
	.cancel		= &bufio_sink_cancel_noop,
	.lasterr	= &bufio_sink_lasterr_noop,
	.clearerr	= &bufio_sink_clearerr_noop,
#else
	&bufio_sink_copyfrom_noop,
	&bufio_sink_copyin_noop,
	&bufio_sink_buffered,
	&bufio_sink_poll_noop,
	&bufio_sink_cancel_noop,
	&bufio_sink_lasterr_noop,
	&bufio_sink_clearerr_noop,
#endif
}; /* bufio_sink_initializer */


#define BUFIO_FRAME_PUSH(b, c, f) do {					\
	(f)->bp	= &(b);							\
	SLIST_INSERT_HEAD(&(c)->call.frames, (f), sle);			\
} while(0)

#define BUFIO_FRAME_POP(b, c, f) do {					\
	if ((b) != 0) {							\
		assert((f) == SLIST_FIRST(&(c)->call.frames));		\
		SLIST_REMOVE_HEAD(&(c)->call.frames, sle);		\
	}								\
} while(0)

#define BUFIO_FRAME_OKAY(f)	(*(f)->bp != 0)


const struct bufio_options bufio_defaults = {
#ifndef WIN32
	.max_bufsiz	= 4096,
	.max_recurse	= 7,
#else
	4096,
	7,
#endif
}; /* bufio_defaults */


struct bufio_frame {
	struct bufio **bp;

	SLIST_ENTRY(bufio_frame) sle;
}; /* struct bufio_frame */


union bufio_callback {
	bufio_read_cb read;
	bufio_write_cb write;
	bufio_gets_cb gets;
	bufio_flush_cb flush;
}; /* union bufio_callback */


enum bufio_call_state {
	BUFIO_CALL_VACANT,
	BUFIO_CALL_PENDING,
	BUFIO_CALL_RUNNING,
	BUFIO_CALL_POLLING,
}; /* enum bufio_call_state */


enum bufio_call_type {
	BUFIO_CTYPE_READ	= 1 << 0,
	BUFIO_CTYPE_WRITE	= 1 << 1,
	BUFIO_CTYPE_GETS	= 1 << 2,
	BUFIO_CTYPE_FLUSH	= 1 << 3,
	BUFIO_CTYPE_READN	= 1 << 4,
	BUFIO_CTYPE_WRITEN	= 1 << 5,
}; /* enum bufio_call_type */


struct bufio_call {
	struct bufio_membuf mbuf;

	union bufio_callback cb;

	void *arg;
}; /* struct bufio_call */


struct bufio_channel {
	struct bufio_pagebuf buffer;

	union {
		struct bufio_source *source;
		struct bufio_sink *sink;
	} next;

	struct {
		enum bufio_call_state state;

		enum bufio_call_type type;

		struct bufio_call info;

		struct timeval expire;

		SLIST_HEAD(, bufio_frame) frames;
	} call;
}; /* struct bufio_channel */


struct bufio {
	struct bufio_options opts;

	const struct arena_prototype *ap;

	struct bufio_channel read, write;

	enum bufio_errno last_errno;
	int sys_errno;
}; /* struct bufio */


static void bufio_destroy(struct bufio *b) {
	struct bufio_frame *f;

	if (!b)
		return /* void */;


	bufio_pagebuf_destroy(&b->read.buffer);

	SLIST_FOREACH(f, &b->read.call.frames, sle)
		*f->bp	= 0;

	SLIST_INIT(&b->read.call.frames);


	bufio_pagebuf_destroy(&b->write.buffer);

	SLIST_FOREACH(f, &b->write.call.frames, sle)
		*f->bp	= 0;

	SLIST_INIT(&b->write.call.frames);

	return /* void */;
} /* bufio_destroy() */


struct bufio *bufio_open(const struct bufio_options *opts, const struct arena_prototype *ap, enum bufio_errno *our_errno) {
	static const struct bufio bufio_initializer;
	struct bufio *b	= 0;
	struct bufio_pagebuf_options pagebuf_opts = bufio_pagebuf_defaults;
	int sys_errno;

	if (!opts)
		opts	= &bufio_defaults;

	if (!ap)
		ap	= ARENA_STDLIB;

	if (!(b = ap->malloc(ap, sizeof *b, 0)))
		goto sysfail;

	*b	= bufio_initializer;
	b->opts	= *opts;
	b->ap	= ap;

	pagebuf_opts.page_size	= MIN(1024, b->opts.max_bufsiz);
	pagebuf_opts.keep_lowat	= 0;
	pagebuf_opts.keep_hiwat	= b->opts.max_bufsiz;

	(void)bufio_pagebuf_init(&b->read.buffer, &pagebuf_opts, b->ap);
	(void)bufio_pagebuf_init(&b->write.buffer, &pagebuf_opts, b->ap);

	SLIST_INIT(&b->read.call.frames);
	SLIST_INIT(&b->write.call.frames);

	return b;
sysfail:
	*our_errno	= BUFIO_ESYSTEM;

	/* FALL THROUGH */
anyfail:	
	sys_errno	= GetLastError();

	bufio_destroy(b);

	SetLastError(sys_errno);

	return 0;
} /* bufio_open() */


void bufio_close(struct bufio *b) {
	const struct arena_prototype *ap;

	if (b == 0)
		return /* void */;

	ap	= b->ap;

	bufio_destroy(b);

	ap->free(ap, memset(b, 0xdeadbeef, sizeof *b));

	return /* void */;
} /* bufio_close() */


enum bufio_errno bufio_set_sink(struct bufio *b, struct bufio_sink *snk) {
	b->write.next.sink	= snk;

	return 0;
} /* bufio_set_sink() */


enum bufio_errno bufio_set_source(struct bufio *b, struct bufio_source *src) {
	b->read.next.source	= src;

	bufio_pagebuf_set_source(&b->read.buffer, src);

	return 0;
} /* bufio_set_source() */


struct bufio_sink *bufio_get_sink(struct bufio *b) {
	return b->write.next.sink;
} /* bufio_get_sink() */


struct bufio_source *bufio_get_source(struct bufio *b) {
	return b->read.next.source;
} /* bufio_get_source() */


struct bufio_sink *bufio_to_sink(struct bufio *b) {
#if 1
	return b->write.next.sink;
#else
	return bufio_pagebuf_to_source(&b->write.buffer);
#endif
} /* bufio_to_sink() */


struct bufio_source *bufio_to_source(struct bufio *b) {
	return bufio_pagebuf_to_source(&b->read.buffer);
} /* bufio_to_source() */


static void bufio_exec(struct bufio *, struct bufio_channel *, enum bufio_errno, unsigned);

static void bufio_read_wakeup(struct bufio_source *src, enum bufio_errno e, void *arg) {
	struct bufio *b	= arg;

	/*
	 * XXX: If we've been cancelled it's not necessarily true that 0 is
	 * our recursion depth (as-if called directly from libevent's main
	 * loop).
	 */
	bufio_exec(b, &b->read, e, 0);

	return /* void */;
} /* bufio_read_wakeup() */

static void bufio_write_wakeup(struct bufio_sink *snk, enum bufio_errno e, void *arg) {
	struct bufio *b	= arg;

	bufio_exec(b, &b->write, e, 0);

	return /* void */;
} /* bufio_write_wakeup() */


static void bufio_exec(struct bufio *b, struct bufio_channel *c, enum bufio_errno e, unsigned i) {
	struct bufio_source *src	= 0;
	struct bufio_sink *snk		= 0;
	struct timeval now, timeout;
	size_t n;

	c->call.state	= BUFIO_CALL_RUNNING;

	if (e == BUFIO_ECANCELLED)
		goto callback;

	assert(0 == gettimeofday(&now, 0));

	if (timerisset(&c->call.expire) && timercmp(&now, &c->call.expire, >=)) {
		e	= BUFIO_ETIMEDOUT;

		goto callback;
	}

	if (c->call.type & (BUFIO_CTYPE_READ | BUFIO_CTYPE_GETS | BUFIO_CTYPE_READN)) {
		src	= bufio_pagebuf_to_source(&c->buffer);

		#if 0
		/*
		 * FIXME: If data is buffered and we poll, we could stall
		 * the stream if the application protocol relies on that
		 * buffered data to progress things. Disable for now.  We
		 * need a way to drain all the buffered data and complete
		 * more I/O operations until it's completely consumed,
		 * *then* we can poll.
		 */
		if (i > ((c->call.type == BUFIO_CTYPE_GETS)? b->opts.max_recurse * 3 : b->opts.max_recurse))
			goto poll;
		#endif

		if (c->call.type == BUFIO_CTYPE_GETS) {
			n	= bufio_pagebuf_gets(&c->buffer, c->call.info.mbuf.page.cursor.put, bufio_page_nfree(&c->call.info.mbuf.page), 0, &e);

			c->call.info.mbuf.page.cursor.put	+= n;
		} else if (c->call.type == BUFIO_CTYPE_READN) {
			size_t t	= 0;

			do {
				n	= src->copyto(src, bufio_membuf_to_sink(&c->call.info.mbuf), 0, &e);
				t	+= n;
			} while (n > 0);

			n	= t;
		} else
			n	= src->copyto(src, bufio_membuf_to_sink(&c->call.info.mbuf), 0, &e);

		if (n > 0 && (c->call.type != BUFIO_CTYPE_READN || 0 == bufio_page_nfree(&c->call.info.mbuf.page))) {
			e	= 0;

			goto callback;
		}
	} else {
		snk	= c->next.sink;

		#if 0	/* See above for why this is broken. */
		if (i > b->opts.max_recurse)
			goto poll;
		#endif

		if (c->call.type == BUFIO_CTYPE_FLUSH) {
			while (0 < (n = snk->copyin(snk, 0, 0, BUFIO_FLUSH, &e)))
				;;

			if (e == 0)
				goto callback;
		} else {
			if (c->call.type == BUFIO_CTYPE_WRITEN) {
				size_t t	= 0;

				do {
					n	= snk->copyfrom(snk, bufio_membuf_to_source(&c->call.info.mbuf), 0, &e);
					t	+= n;
				} while (n > 0);

				n	= t;
			} else
				n	= snk->copyfrom(snk, bufio_membuf_to_source(&c->call.info.mbuf), 0, &e);

			if (0 < n && (c->call.type != BUFIO_CTYPE_WRITEN || 0 == bufio_page_len(&c->call.info.mbuf.page))) {
				e	= 0;

				goto callback;
			}
		}
	}

	if (!BUFIO_WOULDBLOCK(e))
		goto callback;

poll:
	c->call.state	= BUFIO_CALL_POLLING;

	if (timerisset(&c->call.expire))
		timersub(&c->call.expire, &now, &timeout);
	else
		timerclear(&timeout);

	if (src != 0) {
		if (0 != (e = src->poll(src, &bufio_read_wakeup, b, &timeout)))
			goto callback;
	} else {
		if (0 != (e = snk->poll(snk, &bufio_write_wakeup, b, &timeout)))
			goto callback;
	}

	return /* void */;
callback:
	c->call.state		= BUFIO_CALL_VACANT;

	b->last_errno		= e;
	b->sys_errno		= GetLastError();

	if (c->call.type & (BUFIO_CTYPE_READ | BUFIO_CTYPE_GETS | BUFIO_CTYPE_READN)) {
		c->call.info.cb.read(b, c->call.info.mbuf.page.cursor.get, bufio_page_len(&c->call.info.mbuf.page), e, c->call.info.arg);
	} else if (c->call.type == BUFIO_CTYPE_FLUSH) {
		c->call.info.cb.flush(b, e, c->call.info.arg);
	} else {
		c->call.info.cb.write(b, c->call.info.mbuf.page.base, bufio_page_size(&c->call.info.mbuf.page) - bufio_page_len(&c->call.info.mbuf.page), e, c->call.info.arg);
	}

	return /* void */;
} /* bufio_exec() */


static void bufio_prep(struct bufio *b, struct bufio_channel *c, enum bufio_call_type ctype, struct bufio_call *call, struct timeval *timeout) {
	unsigned i	= 0;
	struct bufio_frame f;	
	struct timeval now;

	assert(c->call.state == BUFIO_CALL_VACANT);

	c->call.type		= ctype;
	c->call.info		= *call;

	if (timeout) {
		assert(0 == gettimeofday(&now, 0));

		timeradd(&now, timeout, &c->call.expire);
	} else
		timerclear(&c->call.expire);

	c->call.state		= BUFIO_CALL_PENDING;

	/* Fall back into the original write frame, if any. */
	if (!SLIST_EMPTY(&c->call.frames))
		return /* void */;

	BUFIO_FRAME_PUSH(b, c, &f);

	while (BUFIO_FRAME_OKAY(&f) && c->call.state == BUFIO_CALL_PENDING)
		bufio_exec(b, c, 0, i++);

	BUFIO_FRAME_POP(b, c, &f);

	return /* void */;
} /* bufio_prep() */


void bufio_read(struct bufio *b, void *dst, size_t dstlen, bufio_read_cb cb, void *arg, struct timeval *timeout) {
	struct bufio_call call;

	(void)bufio_membuf_init_sink(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, dst, dstlen);

	call.cb.read	= cb;
	call.arg	= arg;

	bufio_prep(b, &b->read, BUFIO_CTYPE_READ, &call, timeout);

	return /* void */;
} /* bufio_read() */


void bufio_readn(struct bufio *b, void *dst, size_t dstlen, bufio_read_cb cb, void *arg, struct timeval *timeout) {
	struct bufio_call call;

	(void)bufio_membuf_init_sink(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, dst, dstlen);

	call.cb.read	= cb;
	call.arg	= arg;

	bufio_prep(b, &b->read, BUFIO_CTYPE_READN, &call, timeout);

	return /* void */;
} /* bufio_readn() */


void bufio_gets(struct bufio *b, char *dst, size_t dstlen, bufio_gets_cb cb, void *arg, struct timeval *timeout) {
	struct bufio_call call;

	(void)bufio_membuf_init_sink(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, dst, dstlen);

	call.cb.read	= cb;
	call.arg	= arg;

	bufio_prep(b, &b->read, BUFIO_CTYPE_GETS, &call, timeout);

	return /* void */;
} /* bufio_gets() */


void bufio_write(struct bufio *b, const void *src, size_t srclen, bufio_write_cb cb, void *arg, struct timeval *timeout) {
	struct bufio_call call;

	(void)bufio_membuf_init_source(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, src, srclen);

	call.cb.write	= cb;
	call.arg	= arg;

	bufio_prep(b, &b->write, BUFIO_CTYPE_WRITE, &call, timeout);

	return /* void */;
} /* bufio_write() */


void bufio_writen(struct bufio *b, const void *src, size_t srclen, bufio_write_cb cb, void *arg, struct timeval *timeout) {
	struct bufio_call call;

	(void)bufio_membuf_init_source(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, src, srclen);

	call.cb.write	= cb;
	call.arg	= arg;

	bufio_prep(b, &b->write, BUFIO_CTYPE_WRITEN, &call, timeout);

	return /* void */;
} /* bufio_writen() */


void bufio_flush(struct bufio *b, bufio_flush_cb cb, void *arg, struct timeval *timeout) {
	struct bufio_call call;

	(void)bufio_membuf_init(&call.mbuf, BUFIO_MEMBUF_DEFAULTS);

	call.cb.flush	= cb;
	call.arg	= arg;

	bufio_prep(b, &b->write, BUFIO_CTYPE_FLUSH, &call, timeout);

	return /* void */;
} /* bufio_flush() */


const char *bufio_strerror(struct bufio *b) {
	switch (b->last_errno) {
	case BUFIO_ESYSTEM:
		return strerror(b->sys_errno);
	default:
		return bufio_errlist[b->last_errno];
	}

	/* NOT REACHED */
} /* bufio_strerror() */



syntax highlighted by Code2HTML, v. 0.9.1