/* ==========================================================================
 * libevnet/src/bufio-pagebuf.c - Network server library for libevent.
 * --------------------------------------------------------------------------
 * Copyright (c) 2006  Barracuda Networks, Inc.
 * Copyright (c) 2006  William Ahern
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the
 * "Software"), to deal in the Software without restriction, including
 * without limitation the rights to use, copy, modify, merge, publish,
 * distribute, sublicense, and/or sell copies of the Software, and to permit
 * persons to whom the Software is furnished to do so, subject to the
 * following conditions:
 *
 * The above copyright notice and this permission notice shall be included
 * in all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
 * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
 * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
 * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
 * USE OR OTHER DEALINGS IN THE SOFTWARE.
 * ==========================================================================
 */
#include <errno.h>	/* EAGAIN EFAULT ENOTSUP */

#include <assert.h>

#include <string.h>	/* memchr(3) */

#include <sys/param.h>	/* MIN */

#include <sys/queue.h>	/* SLIST TAILQ */

#include <sys/time.h>	/* struct timeval */

#include <windows.h>	/* GetLastError SetLastError */

#include <winsock2.h>	/* INVALID_SOCKET */

#include <arena/proto.h>

#include "bufio.h"
#include "pagebuf.h"
#include "membuf.h"

#include <stdio.h>
#define MARK (fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__))


#define BUFIO_PAGEBUF_FRAME_PUSH(b, f) do {				\
	(f)->bp	= &(b);							\
	SLIST_INSERT_HEAD(&(b)->stack, (f), sle);			\
} while(0)

#define BUFIO_PAGEBUF_FRAME_POP(b, f) do {				\
	if ((b) != 0) {							\
		assert((f) == SLIST_FIRST(&(b)->stack));		\
		SLIST_REMOVE_HEAD(&(b)->stack, sle);			\
	}								\
} while(0)

#define BUFIO_PAGEBUF_FRAME_OKAY(f)	((f)->bp != 0)


const struct bufio_pagebuf_options bufio_pagebuf_defaults = {
#ifndef WIN32
	.page_size	= 512,
	.keep_lowat	= 4096,
	.keep_hiwat	= 4096,
#else
	512,
	4096,
	4096,
#endif
}; /* bufio_pagebuf_defaults */


static size_t bufio_pagebuf_memchr(struct bufio_pagebuf *b, int c) {
	size_t offset	= 0;
	struct bufio_page *p;
	unsigned char *pos;

	TAILQ_FOREACH(p, &b->pages, tqe) {
		if ((pos = memchr(p->cursor.get, c, bufio_page_len(p))))
			return offset + (pos - p->cursor.get);

		offset	+= bufio_page_len(p);
	}

	return -1;
} /* bufio_pagebuf_memchr() */


static enum bufio_errno bufio_pagebuf_grow(struct bufio_pagebuf *b) {
	struct bufio_page *p;

	assert(b->opts.page_size > 0);

	if (!(p = b->ap->malloc(b->ap, sizeof *p + b->opts.page_size, 0)))
		return BUFIO_ESYSTEM;

	p->base		= p->cursor.get
			= p->cursor.put
			= ((unsigned char *)p) + sizeof *p;

	p->cursor.end	= p->cursor.put	+ b->opts.page_size;

	TAILQ_INSERT_TAIL(&b->pages, p, tqe);
	b->npages++;

	return 0;
} /* bufio_pagebuf_grow() */


static void bufio_pagebuf_reap(struct bufio_pagebuf *b) {
	struct bufio_page *p;

	while (TAILQ_END(&b->pages) != (p = TAILQ_FIRST(&b->pages)) && 0 == bufio_page_len(p)) {
		TAILQ_REMOVE(&b->pages, p, tqe);
		b->npages--;

		b->ap->free(b->ap, p);
	}

	return /* void */;
} /* bufio_pagebuf_reap() */


static size_t bufio_pagebuf_fill(struct bufio_pagebuf *b, int flags, enum bufio_errno *e) {
	size_t n, nbytes	= 0;
	struct bufio_page *p;
	struct bufio_pagebuf_frame f;

	BUFIO_PAGEBUF_FRAME_PUSH(b, &f);

	if (b->nbytes < b->opts.keep_hiwat && b->next.source) {
		if (0 == (n = b->next.source->copyto(b->next.source, b->sink, flags & (~BUFIO_PEEK), e)))
			goto epilogue;

		nbytes	+= n;

		if (!BUFIO_PAGEBUF_FRAME_OKAY(&f))
			goto epilogue;
	}

epilogue:
	if (!BUFIO_PAGEBUF_FRAME_OKAY(&f))
		*e	= BUFIO_ECANCELLED;

	BUFIO_PAGEBUF_FRAME_POP(b, &f);

	return nbytes;
} /* bufio_pagebuf_fill() */


static size_t bufio_pagebuf_copyto(struct bufio_source *source, struct bufio_sink *sink, int flags, enum bufio_errno *e) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(source, bufio_pagebuf, bufio_source);
	size_t n, nbytes	= 0;
	int drain		= (flags & BUFIO_DRAIN);
	struct bufio_page *p;
	struct bufio_pagebuf_frame f;
	struct { unsigned char *buf, *pos, *end; } c;

	*e	= 0;

	BUFIO_PAGEBUF_FRAME_PUSH(b, &f);

fill:
	if (0 == bufio_pagebuf_fill(b, flags, e)) {
		if (BUFIO_FAILURE(*e))
			goto epilogue;

		if (BUFIO_EOF(*e)) {
			if (b->nbytes == 0)
				goto epilogue;

			drain	= 1;
		}
	}

	if ((b->nbytes < b->opts.keep_lowat && !drain) || b->nbytes == 0) {
		*e	= BUFIO_EAGAIN;

		goto anyfail;
	}

	for (p = TAILQ_FIRST(&b->pages); TAILQ_END(&b->pages) != p; p = TAILQ_NEXT(p, tqe)) {
		c.buf	= c.pos
			= p->cursor.get;
		c.end	= p->cursor.put;

		do {
			*e	= 0;
			n	= sink->copyin(sink, c.pos, c.end - c.pos, flags, e);

			c.pos	+= n;
			nbytes	+= n;
		} while (n > 0 && c.pos < c.end);

		if (!BUFIO_PAGEBUF_FRAME_OKAY(&f))
			goto epilogue;

		if (!(flags & BUFIO_PEEK)) {
			p->cursor.get	+= c.pos - c.buf;
			b->nbytes	-= c.pos - c.buf;
		}

		if (n == 0 && *e != 0) {
			bufio_pagebuf_reap(b);

			goto epilogue;
		}
	}

	bufio_pagebuf_reap(b);

	goto fill;

	/* NOT REACHED */
anyfail:
	if (GetLastError() == 0)
		SetLastError(EFAULT);

	if (BUFIO_PAGEBUF_FRAME_OKAY(&f))
		b->lasterr.source	= *e;

	/* FALL THROUGH */
epilogue:
	if (!BUFIO_PAGEBUF_FRAME_OKAY(&f))
		*e	= BUFIO_ECANCELLED;

	BUFIO_PAGEBUF_FRAME_POP(b, &f);

	return nbytes;
} /* bufio_pagebuf_copyto() */


static size_t bufio_pagebuf_copyout(struct bufio_source *source, void *dstbuf, size_t bufsiz, int flags, enum bufio_errno *e) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(source, bufio_pagebuf, bufio_source);
	struct bufio_pagebuf_frame f;
	struct bufio_membuf mbuf;
	size_t nbytes;

	*e	= 0;

	BUFIO_PAGEBUF_FRAME_PUSH(b, &f);

	nbytes	= bufio_pagebuf_copyto(source, bufio_membuf_init_sink(&mbuf, BUFIO_MEMBUF_DEFAULTS, dstbuf, bufsiz), flags, e);

	if (!BUFIO_PAGEBUF_FRAME_OKAY(&f))
		*e	= BUFIO_ECANCELLED;

	BUFIO_PAGEBUF_FRAME_POP(b, &f);

	return nbytes;
} /* bufio_pagebuf_copyout() */


static size_t bufio_pagebuf_source_buffered(struct bufio_source *src) {
	return BUFIO_CAST_FROM(src, bufio_pagebuf, bufio_source)->nbytes;
} /* bufio_pagebuf_source_buffered() */


static enum bufio_errno bufio_pagebuf_source_poll(struct bufio_source *src, void (*cb)(struct bufio_source *, enum bufio_errno, void *), void *arg, struct timeval *timeout) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(src, bufio_pagebuf, bufio_source);

	if (!b->next.source) {
		SetLastError(ENOTSUP);
		return BUFIO_ESYSTEM;
	}

	return b->next.source->poll(b->next.source, cb, arg, timeout);
} /* bufio_pagebuf_source_poll() */


static enum bufio_errno bufio_pagebuf_source_cancel(struct bufio_source *src, int notify) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(src, bufio_pagebuf, bufio_source);

	if (!b->next.source) {
		SetLastError(ENOTSUP);
		return BUFIO_ESYSTEM;
	}

	return b->next.source->cancel(b->next.source, notify);
} /* bufio_pagebuf_source_cancel() */


static enum bufio_errno bufio_pagebuf_source_lasterr(struct bufio_source *src) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(src, bufio_pagebuf, bufio_source);

	if (b->lasterr.source)
		return b->lasterr.source;

	return (b->next.source)? b->next.source->lasterr(b->next.source) : 0;
} /* bufio_pagebuf_source_lasterr() */


static void bufio_pagebuf_source_clearerr(struct bufio_source *src) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(src, bufio_pagebuf, bufio_source);

	b->lasterr.source	= 0;

	if (b->next.source)
		(b->next.source->clearerr)(b->next.source);

	return /* void */;
} /* bufio_pagebuf_source_clearerr() */


static size_t bufio_pagebuf_copyfrom(struct bufio_sink *sink, struct bufio_source *source, int flags, enum bufio_errno *e) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(sink, bufio_pagebuf, bufio_sink);
	size_t n, nbytes	= 0;
	int flush		= (flags & BUFIO_FLUSH);
	struct bufio_page *p;
	struct bufio_pagebuf_frame f;

	*e	= 0;

	BUFIO_PAGEBUF_FRAME_PUSH(b, &f);

flush:
	do {
		n	= 0;

		if ((b->nbytes >= b->opts.keep_lowat || flush) && b->next.sink) {
			if (0 == (n = b->next.sink->copyfrom(b->next.sink, b->source, flags, e))
			&&  BUFIO_FAILURE(*e))
				goto epilogue;

			if (!BUFIO_PAGEBUF_FRAME_OKAY(&f))
				goto epilogue;
		}
	} while (flush && b->nbytes > 0 && n > 0 && b->next.sink);

	if (b->nbytes >= b->opts.keep_hiwat || (flush && b->nbytes > 0)) {
		*e	= BUFIO_EAGAIN;

		goto anyfail;
	}

	while (TAILQ_END(&b->pages) == (p = TAILQ_LAST(&b->pages, bufio_pages)) || 0 == bufio_page_nfree(p)) {
		if (0 != (*e = bufio_pagebuf_grow(b)))
			goto anyfail;
	}

	n	= MIN(bufio_page_nfree(p), b->opts.keep_hiwat - b->nbytes);

	if (0 == (n = source->copyout(source, p->cursor.put, n, flags, e)))
		goto epilogue;

	nbytes	+= n;

	if (!BUFIO_PAGEBUF_FRAME_OKAY(&f))
		goto epilogue;

	p->cursor.put	+= n;
	b->nbytes	+= n;

	goto flush;

	/* NOT REACHED */
anyfail:
	if (GetLastError() == 0)
		SetLastError(EFAULT);

	if (BUFIO_PAGEBUF_FRAME_OKAY(&f))
		b->lasterr.sink	= *e;

	/* FALL THROUGH */
epilogue:
	if (!BUFIO_PAGEBUF_FRAME_OKAY(&f))
		*e	= BUFIO_ECANCELLED;

	BUFIO_PAGEBUF_FRAME_POP(b, &f);

	return nbytes;
} /* bufio_pagebuf_copyfrom() */


static size_t bufio_pagebuf_copyin(struct bufio_sink *sink, void *srcbuf, size_t bufsiz, int flags, enum bufio_errno *e) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(sink, bufio_pagebuf, bufio_sink);
	struct bufio_pagebuf_frame f;
	struct bufio_membuf mbuf;
	size_t nbytes;

	*e	= 0;

	BUFIO_PAGEBUF_FRAME_PUSH(b, &f);

	nbytes	= bufio_pagebuf_copyfrom(sink, bufio_membuf_init_source(&mbuf, BUFIO_MEMBUF_DEFAULTS, srcbuf, bufsiz), flags, e);

	if (!BUFIO_PAGEBUF_FRAME_OKAY(&f))
		*e	= BUFIO_ECANCELLED;

	BUFIO_PAGEBUF_FRAME_POP(b, &f);

	return nbytes;
} /* bufio_pagebuf_copyin() */


static size_t bufio_pagebuf_sink_buffered(struct bufio_sink *snk) {
	return BUFIO_CAST_FROM(snk, bufio_pagebuf, bufio_sink)->nbytes;
} /* bufio_pagebuf_sink_buffered() */


static enum bufio_errno bufio_pagebuf_sink_poll(struct bufio_sink *snk, void (*cb)(struct bufio_sink *, enum bufio_errno, void *), void *arg, struct timeval *timeout) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(snk, bufio_pagebuf, bufio_sink);

	if (!b->next.sink) {
		SetLastError(ENOTSUP);
		return BUFIO_ESYSTEM;
	}

	return b->next.sink->poll(b->next.sink, cb, arg, timeout);
} /* bufio_pagebuf_sink_poll() */


static enum bufio_errno bufio_pagebuf_sink_cancel(struct bufio_sink *snk, int notify) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(snk, bufio_pagebuf, bufio_sink);

	if (!b->next.sink) {
		SetLastError(ENOTSUP);
		return BUFIO_ESYSTEM;
	}

	return b->next.sink->cancel(b->next.sink, notify);
} /* bufio_pagebuf_sink_cancel() */


static enum bufio_errno bufio_pagebuf_sink_lasterr(struct bufio_sink *snk) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(snk, bufio_pagebuf, bufio_sink);

	if (b->lasterr.sink)
		return b->lasterr.sink;

	return (b->next.sink)? b->next.sink->lasterr(b->next.sink) : 0;
} /* bufio_pagebuf_sink_lasterr() */


static void bufio_pagebuf_sink_clearerr(struct bufio_sink *snk) {
	struct bufio_pagebuf *b	= BUFIO_CAST_FROM(snk, bufio_pagebuf, bufio_sink);

	b->lasterr.sink	= 0;

	if (b->next.sink)
		(b->next.sink->clearerr)(b->next.sink);

	return /* void */;
} /* bufio_pagebuf_sink_clearerr() */


struct bufio_pagebuf *bufio_pagebuf_init(struct bufio_pagebuf *b, const struct bufio_pagebuf_options *opts, const struct arena_prototype *ap) {
	static const struct bufio_pagebuf bufio_pagebuf_initializer;
	struct bufio_source *source;
	struct bufio_sink *sink;

	if (!opts)
		opts	= &bufio_pagebuf_defaults;

	if (!ap)
		ap	= ARENA_STDLIB;

	*b	= bufio_pagebuf_initializer;
	b->opts	= *opts;
	b->ap	= ap;

	b->source		= BUFIO_CAST_TO(b, bufio_pagebuf, bufio_source);
	*b->source		= BUFIO_SOURCE_INITIALIZER;
	b->source->copyto	= &bufio_pagebuf_copyto;
	b->source->copyout	= &bufio_pagebuf_copyout;
	b->source->buffered	= &bufio_pagebuf_source_buffered;
	b->source->poll		= &bufio_pagebuf_source_poll;
	b->source->cancel	= &bufio_pagebuf_source_cancel;
	b->source->lasterr	= &bufio_pagebuf_source_lasterr;
	b->source->clearerr	= &bufio_pagebuf_source_clearerr;

	b->sink			= BUFIO_CAST_TO(b, bufio_pagebuf, bufio_sink);
	*b->sink		= BUFIO_SINK_INITIALIZER;
	b->sink->copyfrom	= &bufio_pagebuf_copyfrom;
	b->sink->copyin		= &bufio_pagebuf_copyin;
	b->sink->buffered	= &bufio_pagebuf_sink_buffered;
	b->sink->poll		= &bufio_pagebuf_sink_poll;
	b->sink->cancel		= &bufio_pagebuf_sink_cancel;
	b->sink->lasterr	= &bufio_pagebuf_sink_lasterr;
	b->sink->clearerr	= &bufio_pagebuf_sink_clearerr;

	TAILQ_INIT(&b->pages);

	b->npages	= 0;
	b->nbytes	= 0;

	SLIST_INIT(&b->stack);

	return b;
} /* bufio_pagebuf_init() */


void bufio_pagebuf_clear(struct bufio_pagebuf *b) {
	struct bufio_page *p;

	while ((p = TAILQ_FIRST(&b->pages)) != TAILQ_END(&b->pages)) {
		TAILQ_REMOVE(&b->pages, p, tqe);

		b->ap->free(b->ap, p);
	}

	return /* void */;
} /* bufio_pagebuf_clear() */


void bufio_pagebuf_destroy(struct bufio_pagebuf *b) {
	struct bufio_pagebuf_frame *f;

	bufio_pagebuf_clear(b);

	SLIST_FOREACH(f, &b->stack, sle)
		*f->bp	= 0;

	SLIST_INIT(&b->stack);

	(void)memset(b, 0, sizeof *b);

	return /* void */;
} /* bufio_pagebuf_destroy() */


void bufio_pagebuf_set_source(struct bufio_pagebuf *b, struct bufio_source *s) {
	b->next.source	= s;

	return /* void */;
} /* bufio_pagebuf_set_source() */


void bufio_pagebuf_set_sink(struct bufio_pagebuf *b, struct bufio_sink *s) {
	b->next.sink	= s;

	return /* void */;
} /* bufio_pagebuf_set_sink() */


struct bufio_sink *bufio_pagebuf_to_sink(struct bufio_pagebuf *b) {
	return BUFIO_CAST_TO(b, bufio_pagebuf, bufio_sink);
} /* bufio_pagebuf_to_sink() */


struct bufio_source *bufio_pagebuf_to_source(struct bufio_pagebuf *b) {
	return BUFIO_CAST_TO(b, bufio_pagebuf, bufio_source);
} /* bufio_pagebuf_to_source() */


size_t bufio_pagebuf_gets(struct bufio_pagebuf *b, void *buf, size_t bufsiz, int flags, enum bufio_errno *e) {
	size_t nl, nbytes;

	if (-1 != (nl = bufio_pagebuf_memchr(b, '\n'))) {
		nbytes	= MIN(bufsiz, nl + 1);

		goto copyout;
	}

	if (0 == bufio_pagebuf_fill(b, flags, e)) {
		if (BUFIO_FAILURE(*e))
			return 0;
	} else
		*e	= 0;

	if (-1 != (nl = bufio_pagebuf_memchr(b, '\n'))) {
		nbytes	= MIN(bufsiz, nl + 1);

		goto copyout;
	}

	if (b->nbytes >= bufsiz || BUFIO_EOF(*e)) {
		nbytes	= bufsiz;
	} else {
		*e	= b->lasterr.source
			= BUFIO_EAGAIN;

		return 0;
	}

copyout:
	return bufio_pagebuf_copyout(b->source, buf, nbytes, flags | BUFIO_DRAIN, e);
} /* bufio_pagebuf_gets() */



syntax highlighted by Code2HTML, v. 0.9.1