/* ========================================================================== * libevnet/src/bufio.c - Network server library for libevent. * -------------------------------------------------------------------------- * Copyright (c) 2006 Barracuda Networks, Inc. * Copyright (c) 2006 William Ahern * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the * "Software"), to deal in the Software without restriction, including * without limitation the rights to use, copy, modify, merge, publish, * distribute, sublicense, and/or sell copies of the Software, and to permit * persons to whom the Software is furnished to do so, subject to the * following conditions: * * The above copyright notice and this permission notice shall be included * in all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE * USE OR OTHER DEALINGS IN THE SOFTWARE. * ========================================================================== */ #include /* ENOTSUP */ #include #include /* memset(3) */ #include /* GetLastError SetLastError */ #include /* struct timeval timerclear gettimeofday(3) */ #include /* MIN */ #include /* SLIST */ #include #include "bufio.h" #include "bufio/pagebuf.h" #include "bufio/membuf.h" #if 0 #include #define MARK (fprintf(stderr, "%s:%d\n", __FUNCTION__, __LINE__)) #endif const char *bufio_errlist[BUFIO_NERR] = { [0 ... (BUFIO_NERR - 1)] = "Unknown", [BUFIO_ESUCCESS] = "Success", [BUFIO_ESYSTEM] = "System error", [BUFIO_ECANCELLED] = "Operation cancelled", [BUFIO_EEOF] = "End of file", [BUFIO_ETIMEDOUT] = "Operation timed out", [BUFIO_ENOTPOLLING] = "Nothing to cancel", [BUFIO_EAGAIN] = "Try operation again", [BUFIO_EPIPE] = "Sink disconnected", }; /* bufio_errlist[] */ const size_t bufio_nerr = sizeof bufio_errlist / sizeof *bufio_errlist; static size_t bufio_source_copyto_noop(struct bufio_source *source, struct bufio_sink *sink, int flags, enum bufio_errno *e) { *e = BUFIO_ESYSTEM; SetLastError(ENOTSUP); return 0; } /* bufio_source_copyto_noop() */ static size_t bufio_source_copyout_noop(struct bufio_source *source, void *buf, size_t bufsiz, int flags, enum bufio_errno *e) { *e = BUFIO_ESYSTEM; SetLastError(ENOTSUP); return 0; } /* bufio_source_copyout_noop() */ static size_t bufio_source_buffered(struct bufio_source *src) { return 0; } /* bufio_source_buffered() */ static enum bufio_errno bufio_source_poll_noop(struct bufio_source *source, void (*cb)(struct bufio_source *, enum bufio_errno, void *), void *arg, struct timeval *timeout) { SetLastError(ENOTSUP); return BUFIO_ESYSTEM; } /* bufio_source_poll_noop() */ static enum bufio_errno bufio_source_cancel_noop(struct bufio_source *source, int notify) { return 0; } /* bufio_source_cancel_noop() */ static enum bufio_errno bufio_source_lasterr_noop(struct bufio_source *source) { return 0; } /* bufio_source_lasterr_noop() */ static void bufio_source_clearerr_noop(struct bufio_source *source) { return /* void */; } /* bufio_source_clearerr_noop() */ struct bufio_source bufio_source_initializer = { #ifndef WIN32 .copyto = &bufio_source_copyto_noop, .copyout = &bufio_source_copyout_noop, .buffered = &bufio_source_buffered, .poll = &bufio_source_poll_noop, .cancel = &bufio_source_cancel_noop, .lasterr = &bufio_source_lasterr_noop, .clearerr = &bufio_source_clearerr_noop, #else &bufio_source_copyto_noop, &bufio_source_copyout_noop, &bufio_source_buffered, &bufio_source_poll_noop, &bufio_source_cancel_noop, &bufio_source_lasterr_noop, &bufio_source_clearerr_noop, #endif }; /* bufio_source_initializer */ static size_t bufio_sink_copyfrom_noop(struct bufio_sink *snk, struct bufio_source *src, int flags, enum bufio_errno *e) { *e = BUFIO_ESYSTEM; SetLastError(ENOTSUP); return 0; } /* bufio_sink_copyfrom_noop() */ static size_t bufio_sink_copyin_noop(struct bufio_sink *snk, void *buf, size_t bufsiz, int flags, enum bufio_errno *e) { *e = BUFIO_ESYSTEM; SetLastError(ENOTSUP); return 0; } /* bufio_sink_copyin_noop() */ static size_t bufio_sink_buffered(struct bufio_sink *snk) { return 0; } /* bufio_sink_buffered() */ static enum bufio_errno bufio_sink_poll_noop(struct bufio_sink *sink, void (*cb)(struct bufio_sink *, enum bufio_errno, void *), void *arg, struct timeval *timeout) { SetLastError(ENOTSUP); return BUFIO_ESYSTEM; } /* bufio_sink_poll_noop() */ static enum bufio_errno bufio_sink_cancel_noop(struct bufio_sink *sink, int notify) { return 0; } /* bufio_sink_cancel_noop() */ static enum bufio_errno bufio_sink_lasterr_noop(struct bufio_sink *sink) { return 0; } /* bufio_sink_lasterr_noop() */ static void bufio_sink_clearerr_noop(struct bufio_sink *sink) { return /* void */; } /* bufio_sink_clearerr_noop() */ struct bufio_sink bufio_sink_initializer = { #ifndef WIN32 .copyfrom = &bufio_sink_copyfrom_noop, .copyin = &bufio_sink_copyin_noop, .buffered = &bufio_sink_buffered, .poll = &bufio_sink_poll_noop, .cancel = &bufio_sink_cancel_noop, .lasterr = &bufio_sink_lasterr_noop, .clearerr = &bufio_sink_clearerr_noop, #else &bufio_sink_copyfrom_noop, &bufio_sink_copyin_noop, &bufio_sink_buffered, &bufio_sink_poll_noop, &bufio_sink_cancel_noop, &bufio_sink_lasterr_noop, &bufio_sink_clearerr_noop, #endif }; /* bufio_sink_initializer */ #define BUFIO_FRAME_PUSH(b, c, f) do { \ (f)->bp = &(b); \ SLIST_INSERT_HEAD(&(c)->call.frames, (f), sle); \ } while(0) #define BUFIO_FRAME_POP(b, c, f) do { \ if ((b) != 0) { \ assert((f) == SLIST_FIRST(&(c)->call.frames)); \ SLIST_REMOVE_HEAD(&(c)->call.frames, sle); \ } \ } while(0) #define BUFIO_FRAME_OKAY(f) (*(f)->bp != 0) const struct bufio_options bufio_defaults = { #ifndef WIN32 .max_bufsiz = 4096, .max_recurse = 7, #else 4096, 7, #endif }; /* bufio_defaults */ struct bufio_frame { struct bufio **bp; SLIST_ENTRY(bufio_frame) sle; }; /* struct bufio_frame */ union bufio_callback { bufio_read_cb read; bufio_write_cb write; bufio_gets_cb gets; bufio_flush_cb flush; }; /* union bufio_callback */ enum bufio_call_state { BUFIO_CALL_VACANT, BUFIO_CALL_PENDING, BUFIO_CALL_RUNNING, BUFIO_CALL_POLLING, }; /* enum bufio_call_state */ enum bufio_call_type { BUFIO_CTYPE_READ = 1 << 0, BUFIO_CTYPE_WRITE = 1 << 1, BUFIO_CTYPE_GETS = 1 << 2, BUFIO_CTYPE_FLUSH = 1 << 3, BUFIO_CTYPE_READN = 1 << 4, BUFIO_CTYPE_WRITEN = 1 << 5, }; /* enum bufio_call_type */ struct bufio_call { struct bufio_membuf mbuf; union bufio_callback cb; void *arg; }; /* struct bufio_call */ struct bufio_channel { struct bufio_pagebuf buffer; union { struct bufio_source *source; struct bufio_sink *sink; } next; struct { enum bufio_call_state state; enum bufio_call_type type; struct bufio_call info; struct timeval expire; SLIST_HEAD(, bufio_frame) frames; } call; }; /* struct bufio_channel */ struct bufio { struct bufio_options opts; const struct arena_prototype *ap; struct bufio_channel read, write; enum bufio_errno last_errno; int sys_errno; }; /* struct bufio */ static void bufio_destroy(struct bufio *b) { struct bufio_frame *f; if (!b) return /* void */; bufio_pagebuf_destroy(&b->read.buffer); SLIST_FOREACH(f, &b->read.call.frames, sle) *f->bp = 0; SLIST_INIT(&b->read.call.frames); bufio_pagebuf_destroy(&b->write.buffer); SLIST_FOREACH(f, &b->write.call.frames, sle) *f->bp = 0; SLIST_INIT(&b->write.call.frames); return /* void */; } /* bufio_destroy() */ struct bufio *bufio_open(const struct bufio_options *opts, const struct arena_prototype *ap, enum bufio_errno *our_errno) { static const struct bufio bufio_initializer; struct bufio *b = 0; struct bufio_pagebuf_options pagebuf_opts = bufio_pagebuf_defaults; int sys_errno; if (!opts) opts = &bufio_defaults; if (!ap) ap = ARENA_STDLIB; if (!(b = ap->malloc(ap, sizeof *b, 0))) goto sysfail; *b = bufio_initializer; b->opts = *opts; b->ap = ap; pagebuf_opts.page_size = MIN(1024, b->opts.max_bufsiz); pagebuf_opts.keep_lowat = 0; pagebuf_opts.keep_hiwat = b->opts.max_bufsiz; (void)bufio_pagebuf_init(&b->read.buffer, &pagebuf_opts, b->ap); (void)bufio_pagebuf_init(&b->write.buffer, &pagebuf_opts, b->ap); SLIST_INIT(&b->read.call.frames); SLIST_INIT(&b->write.call.frames); return b; sysfail: *our_errno = BUFIO_ESYSTEM; /* FALL THROUGH */ anyfail: sys_errno = GetLastError(); bufio_destroy(b); SetLastError(sys_errno); return 0; } /* bufio_open() */ void bufio_close(struct bufio *b) { const struct arena_prototype *ap; if (b == 0) return /* void */; ap = b->ap; bufio_destroy(b); ap->free(ap, memset(b, 0xdeadbeef, sizeof *b)); return /* void */; } /* bufio_close() */ enum bufio_errno bufio_set_sink(struct bufio *b, struct bufio_sink *snk) { b->write.next.sink = snk; return 0; } /* bufio_set_sink() */ enum bufio_errno bufio_set_source(struct bufio *b, struct bufio_source *src) { b->read.next.source = src; bufio_pagebuf_set_source(&b->read.buffer, src); return 0; } /* bufio_set_source() */ struct bufio_sink *bufio_get_sink(struct bufio *b) { return b->write.next.sink; } /* bufio_get_sink() */ struct bufio_source *bufio_get_source(struct bufio *b) { return b->read.next.source; } /* bufio_get_source() */ struct bufio_sink *bufio_to_sink(struct bufio *b) { #if 1 return b->write.next.sink; #else return bufio_pagebuf_to_source(&b->write.buffer); #endif } /* bufio_to_sink() */ struct bufio_source *bufio_to_source(struct bufio *b) { return bufio_pagebuf_to_source(&b->read.buffer); } /* bufio_to_source() */ static void bufio_exec(struct bufio *, struct bufio_channel *, enum bufio_errno, unsigned); static void bufio_read_wakeup(struct bufio_source *src, enum bufio_errno e, void *arg) { struct bufio *b = arg; /* * XXX: If we've been cancelled it's not necessarily true that 0 is * our recursion depth (as-if called directly from libevent's main * loop). */ bufio_exec(b, &b->read, e, 0); return /* void */; } /* bufio_read_wakeup() */ static void bufio_write_wakeup(struct bufio_sink *snk, enum bufio_errno e, void *arg) { struct bufio *b = arg; bufio_exec(b, &b->write, e, 0); return /* void */; } /* bufio_write_wakeup() */ static void bufio_exec(struct bufio *b, struct bufio_channel *c, enum bufio_errno e, unsigned i) { struct bufio_source *src = 0; struct bufio_sink *snk = 0; struct timeval now, timeout; size_t n; c->call.state = BUFIO_CALL_RUNNING; if (e == BUFIO_ECANCELLED) goto callback; assert(0 == gettimeofday(&now, 0)); if (timerisset(&c->call.expire) && timercmp(&now, &c->call.expire, >=)) { e = BUFIO_ETIMEDOUT; goto callback; } if (c->call.type & (BUFIO_CTYPE_READ | BUFIO_CTYPE_GETS | BUFIO_CTYPE_READN)) { src = bufio_pagebuf_to_source(&c->buffer); #if 0 /* * FIXME: If data is buffered and we poll, we could stall * the stream if the application protocol relies on that * buffered data to progress things. Disable for now. We * need a way to drain all the buffered data and complete * more I/O operations until it's completely consumed, * *then* we can poll. */ if (i > ((c->call.type == BUFIO_CTYPE_GETS)? b->opts.max_recurse * 3 : b->opts.max_recurse)) goto poll; #endif if (c->call.type == BUFIO_CTYPE_GETS) { n = bufio_pagebuf_gets(&c->buffer, c->call.info.mbuf.page.cursor.put, bufio_page_nfree(&c->call.info.mbuf.page), 0, &e); c->call.info.mbuf.page.cursor.put += n; } else if (c->call.type == BUFIO_CTYPE_READN) { size_t t = 0; do { n = src->copyto(src, bufio_membuf_to_sink(&c->call.info.mbuf), 0, &e); t += n; } while (n > 0); n = t; } else n = src->copyto(src, bufio_membuf_to_sink(&c->call.info.mbuf), 0, &e); if (n > 0 && (c->call.type != BUFIO_CTYPE_READN || 0 == bufio_page_nfree(&c->call.info.mbuf.page))) { e = 0; goto callback; } } else { snk = c->next.sink; #if 0 /* See above for why this is broken. */ if (i > b->opts.max_recurse) goto poll; #endif if (c->call.type == BUFIO_CTYPE_FLUSH) { while (0 < (n = snk->copyin(snk, 0, 0, BUFIO_FLUSH, &e))) ;; if (e == 0) goto callback; } else { if (c->call.type == BUFIO_CTYPE_WRITEN) { size_t t = 0; do { n = snk->copyfrom(snk, bufio_membuf_to_source(&c->call.info.mbuf), 0, &e); t += n; } while (n > 0); n = t; } else n = snk->copyfrom(snk, bufio_membuf_to_source(&c->call.info.mbuf), 0, &e); if (0 < n && (c->call.type != BUFIO_CTYPE_WRITEN || 0 == bufio_page_len(&c->call.info.mbuf.page))) { e = 0; goto callback; } } } if (!BUFIO_WOULDBLOCK(e)) goto callback; poll: c->call.state = BUFIO_CALL_POLLING; if (timerisset(&c->call.expire)) timersub(&c->call.expire, &now, &timeout); else timerclear(&timeout); if (src != 0) { if (0 != (e = src->poll(src, &bufio_read_wakeup, b, &timeout))) goto callback; } else { if (0 != (e = snk->poll(snk, &bufio_write_wakeup, b, &timeout))) goto callback; } return /* void */; callback: c->call.state = BUFIO_CALL_VACANT; b->last_errno = e; b->sys_errno = GetLastError(); if (c->call.type & (BUFIO_CTYPE_READ | BUFIO_CTYPE_GETS | BUFIO_CTYPE_READN)) { c->call.info.cb.read(b, c->call.info.mbuf.page.cursor.get, bufio_page_len(&c->call.info.mbuf.page), e, c->call.info.arg); } else if (c->call.type == BUFIO_CTYPE_FLUSH) { c->call.info.cb.flush(b, e, c->call.info.arg); } else { c->call.info.cb.write(b, c->call.info.mbuf.page.base, bufio_page_size(&c->call.info.mbuf.page) - bufio_page_len(&c->call.info.mbuf.page), e, c->call.info.arg); } return /* void */; } /* bufio_exec() */ static void bufio_prep(struct bufio *b, struct bufio_channel *c, enum bufio_call_type ctype, struct bufio_call *call, struct timeval *timeout) { unsigned i = 0; struct bufio_frame f; struct timeval now; assert(c->call.state == BUFIO_CALL_VACANT); c->call.type = ctype; c->call.info = *call; if (timeout) { assert(0 == gettimeofday(&now, 0)); timeradd(&now, timeout, &c->call.expire); } else timerclear(&c->call.expire); c->call.state = BUFIO_CALL_PENDING; /* Fall back into the original write frame, if any. */ if (!SLIST_EMPTY(&c->call.frames)) return /* void */; BUFIO_FRAME_PUSH(b, c, &f); while (BUFIO_FRAME_OKAY(&f) && c->call.state == BUFIO_CALL_PENDING) bufio_exec(b, c, 0, i++); BUFIO_FRAME_POP(b, c, &f); return /* void */; } /* bufio_prep() */ void bufio_read(struct bufio *b, void *dst, size_t dstlen, bufio_read_cb cb, void *arg, struct timeval *timeout) { struct bufio_call call; (void)bufio_membuf_init_sink(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, dst, dstlen); call.cb.read = cb; call.arg = arg; bufio_prep(b, &b->read, BUFIO_CTYPE_READ, &call, timeout); return /* void */; } /* bufio_read() */ void bufio_readn(struct bufio *b, void *dst, size_t dstlen, bufio_read_cb cb, void *arg, struct timeval *timeout) { struct bufio_call call; (void)bufio_membuf_init_sink(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, dst, dstlen); call.cb.read = cb; call.arg = arg; bufio_prep(b, &b->read, BUFIO_CTYPE_READN, &call, timeout); return /* void */; } /* bufio_readn() */ void bufio_gets(struct bufio *b, char *dst, size_t dstlen, bufio_gets_cb cb, void *arg, struct timeval *timeout) { struct bufio_call call; (void)bufio_membuf_init_sink(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, dst, dstlen); call.cb.read = cb; call.arg = arg; bufio_prep(b, &b->read, BUFIO_CTYPE_GETS, &call, timeout); return /* void */; } /* bufio_gets() */ void bufio_write(struct bufio *b, const void *src, size_t srclen, bufio_write_cb cb, void *arg, struct timeval *timeout) { struct bufio_call call; (void)bufio_membuf_init_source(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, src, srclen); call.cb.write = cb; call.arg = arg; bufio_prep(b, &b->write, BUFIO_CTYPE_WRITE, &call, timeout); return /* void */; } /* bufio_write() */ void bufio_writen(struct bufio *b, const void *src, size_t srclen, bufio_write_cb cb, void *arg, struct timeval *timeout) { struct bufio_call call; (void)bufio_membuf_init_source(&call.mbuf, BUFIO_MEMBUF_DEFAULTS, src, srclen); call.cb.write = cb; call.arg = arg; bufio_prep(b, &b->write, BUFIO_CTYPE_WRITEN, &call, timeout); return /* void */; } /* bufio_writen() */ void bufio_flush(struct bufio *b, bufio_flush_cb cb, void *arg, struct timeval *timeout) { struct bufio_call call; (void)bufio_membuf_init(&call.mbuf, BUFIO_MEMBUF_DEFAULTS); call.cb.flush = cb; call.arg = arg; bufio_prep(b, &b->write, BUFIO_CTYPE_FLUSH, &call, timeout); return /* void */; } /* bufio_flush() */ const char *bufio_strerror(struct bufio *b) { switch (b->last_errno) { case BUFIO_ESYSTEM: return strerror(b->sys_errno); default: return bufio_errlist[b->last_errno]; } /* NOT REACHED */ } /* bufio_strerror() */