/* ==========================================================================
* libevnet/src/bufio.c - Network server library for libevent.
* --------------------------------------------------------------------------
* Copyright (c) 2006 Barracuda Networks, Inc.
* Copyright (c) 2006 William Ahern
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the
* "Software"), to deal in the Software without restriction, including
* without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to permit
* persons to whom the Software is furnished to do so, subject to the
* following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
* USE OR OTHER DEALINGS IN THE SOFTWARE.
* ==========================================================================
*/
#include <errno.h> /* ENOTSUP */
#include <assert.h>
#include <string.h> /* memset(3) */
#include <windows.h> /* GetLastError SetLastError */
#include <sys/time.h> /* struct timeval timerclear gettimeofday(3) */
#include <sys/param.h> /* MIN */
#include <sys/queue.h> /* SLIST */
#include <arena/proto.h>
#include "bufio.h"
#include "bufio/pagebuf.h"
#include "bufio/membuf.h"
#if 0
#include <stdio.h>
#define MARK (fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__))
#endif
const char *bufio_errlist[BUFIO_NERR] = {
[0 ... (BUFIO_NERR - 1)] = "Unknown",
[BUFIO_ESUCCESS] = "Success",
[BUFIO_ESYSTEM] = "System error",
[BUFIO_ECANCELLED] = "Operation cancelled",
[BUFIO_EEOF] = "End of file",
[BUFIO_ETIMEDOUT] = "Operation timed out",
[BUFIO_ENOTPOLLING] = "Nothing to cancel",
[BUFIO_EAGAIN] = "Try operation again",
[BUFIO_EPIPE] = "Sink disconnected",
}; /* bufio_errlist[] */
const size_t bufio_nerr = sizeof bufio_errlist / sizeof *bufio_errlist;
static size_t bufio_source_copyto_noop(struct bufio_source *source, struct bufio_sink *sink, int flags, enum bufio_errno *e) {
*e = BUFIO_ESYSTEM;
SetLastError(ENOTSUP);
return 0;
} /* bufio_source_copyto_noop() */
static size_t bufio_source_copyout_noop(struct bufio_source *source, void *buf, size_t bufsiz, int flags, enum bufio_errno *e) {
*e = BUFIO_ESYSTEM;
SetLastError(ENOTSUP);
return 0;
} /* bufio_source_copyout_noop() */
static size_t bufio_source_buffered(struct bufio_source *src) {
return 0;
} /* bufio_source_buffered() */
static enum bufio_errno bufio_source_poll_noop(struct bufio_source *source, void (*cb)(struct bufio_source *, enum bufio_errno, void *), void *arg, struct timeval *timeout) {
SetLastError(ENOTSUP);
return BUFIO_ESYSTEM;
} /* bufio_source_poll_noop() */
static enum bufio_errno bufio_source_cancel_noop(struct bufio_source *source, int notify) {
return 0;
} /* bufio_source_cancel_noop() */
static enum bufio_errno bufio_source_lasterr_noop(struct bufio_source *source) {
return 0;
} /* bufio_source_lasterr_noop() */
static void bufio_source_clearerr_noop(struct bufio_source *source) {
return /* void */;
} /* bufio_source_clearerr_noop() */
struct bufio_source bufio_source_initializer = {
#ifndef WIN32
.copyto = &bufio_source_copyto_noop,
.copyout = &bufio_source_copyout_noop,
.buffered = &bufio_source_buffered,
.poll = &bufio_source_poll_noop,
.cancel = &bufio_source_cancel_noop,
.lasterr = &bufio_source_lasterr_noop,
.clearerr = &bufio_source_clearerr_noop,
#else
&bufio_source_copyto_noop,
&bufio_source_copyout_noop,
&bufio_source_buffered,
&bufio_source_poll_noop,
&bufio_source_cancel_noop,
&bufio_source_lasterr_noop,
&bufio_source_clearerr_noop,
#endif
}; /* bufio_source_initializer */
static size_t bufio_sink_copyfrom_noop(struct bufio_sink *snk, struct bufio_source *src, int flags, enum bufio_errno *e) {
*e = BUFIO_ESYSTEM;
SetLastError(ENOTSUP);
return 0;
} /* bufio_sink_copyfrom_noop() */
static size_t bufio_sink_copyin_noop(struct bufio_sink *snk, void *buf, size_t bufsiz, int flags, enum bufio_errno *e) {
*e = BUFIO_ESYSTEM;
SetLastError(ENOTSUP);
return 0;
} /* bufio_sink_copyin_noop() */
static size_t bufio_sink_buffered(struct bufio_sink *snk) {
return 0;
} /* bufio_sink_buffered() */
static enum bufio_errno bufio_sink_poll_noop(struct bufio_sink *sink, void (*cb)(struct bufio_sink *, enum bufio_errno, void *), void *arg, struct timeval *timeout) {
SetLastError(ENOTSUP);
return BUFIO_ESYSTEM;
} /* bufio_sink_poll_noop() */
static enum bufio_errno bufio_sink_cancel_noop(struct bufio_sink *sink, int notify) {
return 0;
} /* bufio_sink_cancel_noop() */
static enum bufio_errno bufio_sink_lasterr_noop(struct bufio_sink *sink) {
return 0;
} /* bufio_sink_lasterr_noop() */
static void bufio_sink_clearerr_noop(struct bufio_sink *sink) {
return /* void */;
} /* bufio_sink_clearerr_noop() */
struct bufio_sink bufio_sink_initializer = {
#ifndef WIN32
.copyfrom = &bufio_sink_copyfrom_noop,
.copyin = &bufio_sink_copyin_noop,
.buffered = &bufio_sink_buffered,
.poll = &bufio_sink_poll_noop,
.cancel = &bufio_sink_cancel_noop,
.lasterr = &bufio_sink_lasterr_noop,
.clearerr = &bufio_sink_clearerr_noop,
#else
&bufio_sink_copyfrom_noop,
&bufio_sink_copyin_noop,
&bufio_sink_buffered,
&bufio_sink_poll_noop,
&bufio_sink_cancel_noop,
&bufio_sink_lasterr_noop,
&bufio_sink_clearerr_noop,
#endif
}; /* bufio_sink_initializer */
#define BUFIO_FRAME_PUSH(b, c, f) do { \
(f)->bp = &(b); \
SLIST_INSERT_HEAD(&(c)->call.frames, (f), sle); \
} while(0)
#define BUFIO_FRAME_POP(b, c, f) do { \
if ((b) != 0) { \
assert((f) == SLIST_FIRST(&(c)->call.frames)); \
SLIST_REMOVE_HEAD(&(c)->call.frames, sle); \
} \
} while(0)
#define BUFIO_FRAME_OKAY(f) (*(f)->bp != 0)
const struct bufio_options bufio_defaults = {
#ifndef WIN32
.max_bufsiz = 4096,
.max_recurse = 7,
#else
4096,
7,
#endif
}; /* bufio_defaults */
struct bufio_frame {
struct bufio **bp;
SLIST_ENTRY(bufio_frame) sle;
}; /* struct bufio_frame */
union bufio_callback {
bufio_read_cb read;
bufio_write_cb write;
bufio_gets_cb gets;
bufio_flush_cb flush;
}; /* union bufio_callback */
enum bufio_call_state {
BUFIO_CALL_VACANT,
BUFIO_CALL_PENDING,
BUFIO_CALL_RUNNING,
BUFIO_CALL_POLLING,
}; /* enum bufio_call_state */
enum bufio_call_type {
BUFIO_CTYPE_READ = 1 << 0,
BUFIO_CTYPE_WRITE = 1 << 1,
BUFIO_CTYPE_GETS = 1 << 2,
BUFIO_CTYPE_FLUSH = 1 << 3,
BUFIO_CTYPE_READN = 1 << 4,
BUFIO_CTYPE_WRITEN = 1 << 5,
}; /* enum bufio_call_type */
struct bufio_call {
struct bufio_membuf mbuf;
union bufio_callback cb;
void *arg;
}; /* struct bufio_call */
struct bufio_channel {
struct bufio_pagebuf buffer;
union {
struct bufio_source *source;
struct bufio_sink *sink;
} next;
struct {
enum bufio_call_state state;
enum bufio_call_type type;
struct bufio_call info;
struct timeval expire;
SLIST_HEAD(, bufio_frame) frames;
} call;
}; /* struct bufio_channel */
struct bufio {
struct bufio_options opts;
const struct arena_prototype *ap;
struct bufio_channel read, write;
enum bufio_errno last_errno;
int sys_errno;
}; /* struct bufio */
static void bufio_destroy(struct bufio *b) {
struct bufio_frame *f;
if (!b)
return /* void */;
bufio_pagebuf_destroy(&b->read.buffer);
SLIST_FOREACH(f, &b->read.call.frames, sle)
*f->bp = 0;
SLIST_INIT(&b->read.call.frames);
bufio_pagebuf_destroy(&b->write.buffer);
SLIST_FOREACH(f, &b->write.call.frames, sle)
*f->bp = 0;
SLIST_INIT(&b->write.call.frames);
return /* void */;
} /* bufio_destroy() */
struct bufio *bufio_open(const struct bufio_options *opts, const struct arena_prototype *ap, enum bufio_errno *our_errno) {
static const struct bufio bufio_initializer;
struct bufio *b = 0;
struct bufio_pagebuf_options pagebuf_opts = bufio_pagebuf_defaults;
int sys_errno;
if (!opts)
opts = &bufio_defaults;
if (!ap)
ap = ARENA_STDLIB;
if (!(b = ap->malloc(ap, sizeof *b, 0)))
goto sysfail;
*b = bufio_initializer;
b->opts = *opts;
b->ap = ap;
pagebuf_opts.page_size = MIN(1024, b->opts.max_bufsiz);
pagebuf_opts.keep_lowat = 0;
pagebuf_opts.keep_hiwat = b->opts.max_bufsiz;
(void)bufio_pagebuf_init(&b->read.buffer, &pagebuf_opts, b->ap);
(void)bufio_pagebuf_init(&b->write.buffer, &pagebuf_opts, b->ap);
SLIST_INIT(&b->read.call.frames);
SLIST_INIT(&b->write.call.frames);
return b;
sysfail:
*our_errno = BUFIO_ESYSTEM;
/* FALL THROUGH */
anyfail:
sys_errno = GetLastError();
bufio_destroy(b);
SetLastError(sys_errno);
return 0;
} /* bufio_open() */
void bufio_close(struct bufio *b) {
const struct arena_prototype *ap;
if (b == 0)
return /* void */;
ap = b->ap;
bufio_destroy(b);
ap->free(ap, memset(b, 0xdeadbeef, sizeof *b));
return /* void */;
} /* bufio_close() */
enum bufio_errno bufio_set_sink(struct bufio *b, struct bufio_sink *snk) {
b->write.next.sink = snk;
return 0;
} /* bufio_set_sink() */
enum bufio_errno bufio_set_source(struct bufio *b, struct bufio_source *src) {
b->read.next.source = src;
bufio_pagebuf_set_source(&b->read.buffer, src);
return 0;
} /* bufio_set_source() */
struct bufio_sink *bufio_get_sink(struct bufio *b) {
return b->write.next.sink;
} /* bufio_get_sink() */
struct bufio_source *bufio_get_source(struct bufio *b) {
return b->read.next.source;
} /* bufio_get_source() */
struct bufio_sink *bufio_to_sink(struct bufio *b) {
#if 1
return b->write.next.sink;
#else
return bufio_pagebuf_to_source(&b->write.buffer);
#endif
} /* bufio_to_sink() */
struct bufio_source *bufio_to_source(struct bufio *b) {
return bufio_pagebuf_to_source(&b->read.buffer);
} /* bufio_to_source() */
static void bufio_exec(struct bufio *, struct bufio_channel *, enum bufio_errno, unsigned);
static void bufio_read_wakeup(struct bufio_source *src, enum bufio_errno e, void *arg) {
struct bufio *b = arg;
/*
* XXX: If we've been cancelled it's not necessarily true that 0 is
* our recursion depth (as-if called directly from libevent's main
* loop).
*/
bufio_exec(b, &b->read, e, 0);
return /* void */;
} /* bufio_read_wakeup() */
static void bufio_write_wakeup(struct bufio_sink *snk, enum bufio_errno e, void *arg) {
struct bufio *b = arg;
bufio_exec(b, &b->write, e, 0);
return /* void */;
} /* bufio_write_wakeup() */
static void bufio_exec(struct bufio *b, struct bufio_channel *c, enum bufio_errno e, unsigned i) {
struct bufio_source *src = 0;
struct bufio_sink *snk = 0;
struct timeval now, timeout;
size_t n;
c->call.state = BUFIO_CALL_RUNNING;
if (e == BUFIO_ECANCELLED)
goto callback;
assert(0 == gettimeofday(&now, 0));
if (timerisset(&c->call.expire) && timercmp(&now, &c->call.expire, >=)) {
e = BUFIO_ETIMEDOUT;
goto callback;
}
if (c->call.type & (BUFIO_CTYPE_READ | BUFIO_CTYPE_GETS | BUFIO_CTYPE_READN)) {
src = bufio_pagebuf_to_source(&c->buffer);
#if 0
/*
* FIXME: If data is buffered and we poll, we could stall
* the stream if the application protocol relies on that
* buffered data to progress things. Disable for now. We
* need a way to drain all the buffered data and complete
* more I/O operations until it's completely consumed,
* *then* we can poll.
*/
if (i > ((c->call.type == BUFIO_CTYPE_GETS)? b->opts.max_recurse * 3 : b->opts.max_recurse))
goto poll;
#endif
if (c->call.type == BUFIO_CTYPE_GETS) {
n = bufio_pagebuf_gets(&c->buffer, c->call.info.mbuf.page.cursor.put, bufio_page_nfree(&c->call.info.mbuf.page), 0, &e);
c->call.info.mbuf.page.cursor.put += n;
} else if (c->call.type == BUFIO_CTYPE_READN) {
size_t t = 0;
do {
n = src->copyto(src, bufio_membuf_to_sink(&c->call.info.mbuf), 0, &e);
t += n;
} while (n > 0);
n = t;
} else
n = src->copyto(src, bufio_membuf_to_sink(&c->call.info.mbuf), 0, &e);
if (n > 0 && (c->call.type != BUFIO_CTYPE_READN || 0 == bufio_page_nfree(&c->call.info.mbuf.page))) {
e = 0;
goto callback;
}
} else {
snk = c->next.sink;
#if 0 /* See above for why this is broken. */
if (i > b->opts.max_recurse)
goto poll;
#endif
if (c->call.type == BUFIO_CTYPE_FLUSH) {
while (0 < (n = snk->copyin(snk, 0, 0, BUFIO_FLUSH, &e)))
;;
if (e == 0)
goto callback;
} else {
if (c->call.type == BUFIO_CTYPE_WRITEN) {
size_t t = 0;
do {
n = snk->copyfrom(snk, bufio_membuf_to_source(&c->call.info.mbuf), 0, &e);
t += n;
} while (n > 0);
n = t;
} else
n = snk->copyfrom(snk, bufio_membuf_to_source(&c->call.info.mbuf), 0, &e);
if (0 < n && (c->call.type != BUFIO_CTYPE_WRITEN || 0 == bufio_page_len(&c->call.info.mbuf.page))) {
e = 0;
goto callback;
}
}
}
if (!BUFIO_WOULDBLOCK(e))
goto callback;
poll:
c->call.state = BUFIO_CALL_POLLING;
if (timerisset(&c->call.expire))
timersub(&c->call.expire, &now, &timeout);
else
timerclear(&timeout);
if (src != 0) {
if (0 != (e = src->poll(src, &bufio_read_wakeup, b, &timeout)))
goto callback;
} else {
if (0 != (e = snk->poll(snk, &bufio_write_wakeup, b, &timeout)))
goto callback;
}
return /* void */;
callback:
c->call.state = BUFIO_CALL_VACANT;
b->last_errno = e;
b->sys_errno = GetLastError();
if (c->call.type & (BUFIO_CTYPE_READ | BUFIO_CTYPE_GETS | BUFIO_CTYPE_READN)) {
c->call.info.cb.read(b, c->call.info.mbuf.page.cursor.get, bufio_page_len(&c->call.info.mbuf.page), e, c->call.info.arg);
} else if (c->call.type == BUFIO_CTYPE_FLUSH) {
c->call.info.cb.flush(b, e, c->call.info.arg);
} else {
c->call.info.cb.write(b, c->call.info.mbuf.page.base, bufio_page_size(&c->call.info.mbuf.page) - bufio_page_len(&c->call.info.mbuf.page), e, c->call.info.arg);
}
return /* void */;
} /* bufio_exec() */
static void bufio_prep(struct bufio *b, struct bufio_channel *c, enum bufio_call_type ctype, struct bufio_call *call, struct timeval *timeout) {
unsigned i = 0;
struct bufio_frame f;
struct timeval now;
assert(c->call.state == BUFIO_CALL_VACANT);
c->call.type = ctype;
c->call.info = *call;
if (timeout) {
assert(0 == gettimeofday(&now, 0));
timeradd(&now, timeout, &c->call.expire);
} else
timerclear(&c->call.expire);
c->call.state = BUFIO_CALL_PENDING;
/* Fall back into the original write frame, if any. */
if (!SLIST_EMPTY(&c->call.frames))
return /* void */;
BUFIO_FRAME_PUSH(b, c, &f);
while (BUFIO_FRAME_OKAY(&f) && c->call.state == BUFIO_CALL_PENDING)
bufio_exec(b, c, 0, i++);
BUFIO_FRAME_POP(b, c, &f);
return /* void */;
} /* bufio_prep() */
void bufio_read(struct bufio *b, void *dst, size_t dstlen, bufio_read_cb cb, void *arg, struct timeval *timeout) {
struct bufio_call call;
(void)bufio_membuf_init_sink(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, dst, dstlen);
call.cb.read = cb;
call.arg = arg;
bufio_prep(b, &b->read, BUFIO_CTYPE_READ, &call, timeout);
return /* void */;
} /* bufio_read() */
void bufio_readn(struct bufio *b, void *dst, size_t dstlen, bufio_read_cb cb, void *arg, struct timeval *timeout) {
struct bufio_call call;
(void)bufio_membuf_init_sink(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, dst, dstlen);
call.cb.read = cb;
call.arg = arg;
bufio_prep(b, &b->read, BUFIO_CTYPE_READN, &call, timeout);
return /* void */;
} /* bufio_readn() */
void bufio_gets(struct bufio *b, char *dst, size_t dstlen, bufio_gets_cb cb, void *arg, struct timeval *timeout) {
struct bufio_call call;
(void)bufio_membuf_init_sink(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, dst, dstlen);
call.cb.read = cb;
call.arg = arg;
bufio_prep(b, &b->read, BUFIO_CTYPE_GETS, &call, timeout);
return /* void */;
} /* bufio_gets() */
void bufio_write(struct bufio *b, const void *src, size_t srclen, bufio_write_cb cb, void *arg, struct timeval *timeout) {
struct bufio_call call;
(void)bufio_membuf_init_source(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, src, srclen);
call.cb.write = cb;
call.arg = arg;
bufio_prep(b, &b->write, BUFIO_CTYPE_WRITE, &call, timeout);
return /* void */;
} /* bufio_write() */
void bufio_writen(struct bufio *b, const void *src, size_t srclen, bufio_write_cb cb, void *arg, struct timeval *timeout) {
struct bufio_call call;
(void)bufio_membuf_init_source(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, src, srclen);
call.cb.write = cb;
call.arg = arg;
bufio_prep(b, &b->write, BUFIO_CTYPE_WRITEN, &call, timeout);
return /* void */;
} /* bufio_writen() */
void bufio_flush(struct bufio *b, bufio_flush_cb cb, void *arg, struct timeval *timeout) {
struct bufio_call call;
(void)bufio_membuf_init(&call.mbuf, BUFIO_MEMBUF_DEFAULTS);
call.cb.flush = cb;
call.arg = arg;
bufio_prep(b, &b->write, BUFIO_CTYPE_FLUSH, &call, timeout);
return /* void */;
} /* bufio_flush() */
const char *bufio_strerror(struct bufio *b) {
switch (b->last_errno) {
case BUFIO_ESYSTEM:
return strerror(b->sys_errno);
default:
return bufio_errlist[b->last_errno];
}
/* NOT REACHED */
} /* bufio_strerror() */
syntax highlighted by Code2HTML, v. 0.9.1