/***************************************************************************
*
* Copyright (c) 1998-1999 Niels Möller
* Copyright (c) 1999 BalaBit Computing
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*
* $Id: stream_buffer.c,v 1.6 2004/08/05 10:34:49 bazsi Exp $
*
***************************************************************************/
#include "stream_buffer.h"
#include "xalloc.h"
#include "werror.h"
#include "queue.h"
#include "io.h"
#include "format.h"
#include <assert.h>
#include <string.h>
/* For the packet queue */
/* NOTE: No object header */
struct buffer_node
{
struct ol_queue_node header;
struct ol_string *packet;
};
/* CLASS:
(class
(name stream_buffer)
(super abstract_buffer)
(vars
(block_size simple UINT32)
(buffer space UINT8) ; Size is twice the blocksize
(empty simple int)
; Total amount of data currently in the buffer)
(length . UINT32)
;; (try_write simple int)
(q special-struct "struct ol_queue"
#f do_free_buffer)
(pos simple UINT32) ; Partial packet
(partial string)
(start simple UINT32)
(end simple UINT32)))
*/
/* Prototype */
static void do_free_buffer(struct ol_queue *q);
#include "stream_buffer.c.x"
static void do_free_buffer(struct ol_queue *q)
{
FOR_QUEUE(q, struct buffer_node *, n) {
ol_string_free(n->packet);
ol_space_free(n);
}
}
/* FIXME: this could be very well be optimized */
static int do_write(struct abstract_write *w,
UINT32 length, UINT8 *data)
{
return A_WRITE_STRING(w, c_format("%s", length, data));
}
static int do_write_str(struct abstract_write *c, struct ol_string *string)
{
CAST(stream_buffer, closure, c);
struct buffer_node *new;
/* debug("stream_buffer: do_write length = %i\n",
packet->length); */
if (!string->length) {
return ST_OK | ST_GOON;
}
if (closure->super.closed) {
return ST_FAIL | ST_CLOSE;
}
/* Enqueue packet */
NEW_SPACE(new);
new->packet = string;
ol_queue_add_tail(&closure->q, &new->header);
closure->empty = 0;
closure->length += string->length;
/* debug("stream_buffer: do_write closure->length = %i\n",
closure->length); */
return ST_OK | ST_GOON;
}
/* Copy data as necessary, before writing.
*
* FIXME: Writing of large packets could probably be optimized by
* avoiding copying it into the buffer.
*
* Returns 1 if the buffer is non-empty. */
int do_prepare_write(struct abstract_buffer *c)
{
CAST(stream_buffer, buffer, c);
UINT32 length = buffer->end - buffer->start;
if (buffer->empty)
return 0;
if (buffer->start > buffer->block_size) {
/* Copy contents to the start of the buffer */
memcpy(buffer->buffer, buffer->buffer + buffer->start, length);
buffer->start = 0;
buffer->end = length;
}
while (length < buffer->block_size) {
/* Copy more data into buffer */
if (buffer->partial) {
UINT32 partial_left = buffer->partial->length - buffer->pos;
UINT32 buffer_left = 2*buffer->block_size - buffer->end;
if (partial_left <= buffer_left) {
/* The rest of the partial packet fits in the buffer */
memcpy(buffer->buffer + buffer->end,
buffer->partial->data + buffer->pos,
partial_left);
buffer->end += partial_left;
length += partial_left;
ol_string_free(buffer->partial);
buffer->partial = NULL;
}
else {
memcpy(buffer->buffer + buffer->end,
buffer->partial->data + buffer->pos,
buffer_left);
buffer->end += buffer_left;
length += buffer_left;
buffer->pos += buffer_left;
assert(length >= buffer->block_size);
}
}
else {
/* Dequeue a packet, if possible */
if (!ol_queue_is_empty(&buffer->q)) {
struct buffer_node *n =
(struct buffer_node *) ol_queue_remove_head(&buffer->q);
buffer->partial = n->packet;
buffer->pos = 0;
ol_space_free(n);
}
else
break;
}
}
buffer->empty = !length;
return !buffer->empty;
}
static int do_flush(struct abstract_buffer *c, struct abstract_write *w)
{
CAST(stream_buffer, self, c);
UINT32 size;
int res;
size = LIBOL_MIN(self->end - self->start, self->block_size);
res = A_WRITE(w, size, self->buffer + self->start);
if (res >= 0) {
self->start += res;
assert(self->start <= self->end);
self->length -= res;
}
return ST_OK | ST_GOON;
}
void do_close(struct abstract_buffer *c)
{
c->closed = 1;
}
struct abstract_buffer *make_stream_buffer(UINT32 bufsize)
{
NEW(stream_buffer, res);
res->super.super.write = do_write;
res->super.super.writestr = do_write_str;
res->super.flush = do_flush;
res->super.prepare = do_prepare_write;
res->super.close = do_close;
res->block_size = bufsize;
res->buffer = ol_space_alloc(2 * bufsize);
res->empty = 1;
res->length = 0;
ol_queue_init(&res->q);
res->pos = 0;
res->partial = NULL;
res->start = res->end = 0;
return &res->super;
}
syntax highlighted by Code2HTML, v. 0.9.1