/***************************************************************************
*
* Copyright (c) 1999 BalaBit Computing
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*
* Inspired by nsyslog, originally written by Darren Reed.
*
* $Id: pkt_buffer.c,v 1.20 2005/11/23 08:49:21 bazsi Exp $
*
***************************************************************************/
#include "pkt_buffer.h"
#include "list.h"
#include "io.h"
#include "format.h"
#include <string.h>
struct buffer_node {
struct ol_queue_node header;
struct ol_string *packet;
};
int pktbuf_dropped_pkts = 0;
/* CLASS:
(class
(name pkt_buffer)
(super abstract_buffer)
(vars
(queue special-struct "struct ol_queue" #f do_free_buffer)
(pkt_flush simple int)
(queue_min simple int)
(queue_size simple int)
(queue_max simple int)))
*/
static void do_free_buffer(struct ol_queue *q);
#include "pkt_buffer.c.x"
static void do_free_buffer(struct ol_queue *q)
{
FOR_QUEUE(q, struct buffer_node *, n) {
ol_string_free(n->packet);
ol_space_free(n);
}
}
static int do_write(struct abstract_write *c, UINT32 length, UINT8 *data)
{
return A_WRITE_STRING(c, c_format("%s", length, data));
}
static int do_write_str(struct abstract_write *c, struct ol_string *string)
{
CAST(pkt_buffer, self, c);
if (self->super.closed) {
ol_string_free(string);
return ST_FAIL | ST_CLOSE;
}
if (self->queue_size == self->queue_max) {
/* fifo full */
pktbuf_dropped_pkts++;
ol_string_free(string);
return ST_FAIL | ST_OK;
}
else {
struct buffer_node *item;
NEW_SPACE(item);
item->packet = string;
ol_queue_add_tail(&self->queue, &item->header);
if (++self->queue_size == self->queue_max && self->super.writable)
(*self->super.writable) = 0;
}
return ST_OK | ST_GOON;
}
static int do_flush_stream(struct abstract_buffer *c, struct abstract_write *w)
{
CAST(pkt_buffer, self, c);
char buffer[4096];
int res;
int end;
do {
end = 0;
{
FOR_QUEUE(&self->queue, struct buffer_node *, item) {
int avail = sizeof(buffer) - end;
int move = LIBOL_MIN(item->packet->length, avail);
if (avail <= 0 || move <= 0)
break;
memcpy(&buffer[end], item->packet->data, move);
end += move;
if (item->packet->length != move) {
struct ol_string *s;
s = item->packet;
item->packet = c_format("%s", s->length - move, s->data + move);
ol_string_free(s);
}
else {
/* successfully moved */
self->queue_size--;
ol_queue_remove((struct ol_queue_node *) item);
ol_string_free(item->packet);
ol_space_free(item);
}
}
}
if (end == 0)
break;
res = A_WRITE(w, end, (UINT8 *) buffer);
if (res >= 0) {
if (end == res) {
if (self->super.writable)
(*self->super.writable) = 1;
}
else {
/* this is slow, because of another memory move
* but this is run rarely anyway */
struct buffer_node *item;
NEW_SPACE(item);
item->packet = c_format("%s", end - res, buffer + res);
ol_queue_add_head(&self->queue, &item->header);
if (++self->queue_size == self->queue_max && self->super.writable)
(*self->super.writable) = 0;
break;
}
}
else {
struct buffer_node *item;
NEW_SPACE(item);
item->packet = c_format("%s", end, buffer);
ol_queue_add_head(&self->queue, &item->header);
if (++self->queue_size == self->queue_max && self->super.writable)
(*self->super.writable) = 0;
werror("pkt_buffer::do_flush(): Error flushing data\n");
return ST_DIE;
}
}
while (1);
return ST_OK | ST_GOON;
}
static int do_flush_pkt(struct abstract_buffer *c, struct abstract_write *w)
{
CAST(pkt_buffer, self, c);
int res;
FOR_QUEUE(&self->queue, struct buffer_node *, item) {
res = A_WRITE(w, item->packet->length, item->packet->data);
if (res >= 0) {
if (item->packet->length == res) {
self->queue_size--;
ol_queue_remove((struct ol_queue_node *) item);
ol_string_free(item->packet);
ol_space_free(item);
if (self->super.writable)
(*self->super.writable) = 1;
}
else if (res != 0) {
struct ol_string *s;
s = item->packet;
item->packet = c_format("%s", s->length - res, s->data + res);
ol_string_free(s);
break;
}
else {
break;
}
}
else {
verbose("pkt_buffer::do_flush(): Error flushing data\n");
return ST_DIE;
}
}
return ST_OK | ST_GOON;
}
static int do_flush(struct abstract_buffer *c, struct abstract_write *w)
{
CAST(pkt_buffer, self, c);
if (self->pkt_flush)
return do_flush_pkt(c, w);
else
return do_flush_stream(c, w);
}
static int do_prepare_write(struct abstract_buffer *c)
{
CAST(pkt_buffer, self, c);
return self->queue_size > self->queue_min;
}
static void do_close(struct abstract_buffer *c)
{
CAST(pkt_buffer, self, c);
if (c->writable)
*c->writable = 0;
c->closed = 1;
self->queue_min = 0;
}
struct abstract_buffer *make_pkt_buffer(int queue_max)
{
NEW(pkt_buffer, self);
self->super.super.write = do_write;
self->super.super.writestr = do_write_str;
self->super.flush = do_flush;
self->super.prepare = do_prepare_write;
self->super.close = do_close;
ol_queue_init(&self->queue);
self->queue_min = 0;
self->queue_size = 0;
self->queue_max = queue_max;
return &self->super;
}
struct abstract_buffer *make_pkt_buffer_ext(int queue_min, int queue_max, int pkt_flush)
{
struct abstract_buffer *b = make_pkt_buffer(queue_max);
((struct pkt_buffer *) b)->queue_min = queue_min;
((struct pkt_buffer *) b)->pkt_flush = pkt_flush;
return b;
}
syntax highlighted by Code2HTML, v. 0.9.1