#include "gskbufferstream.h"

static GObjectClass *parent_class = NULL;


enum
{
  /* WARNING: this flag is hard-coded
     in the macros gsk_buffer_stream_*_strict_max_write(). */
  STRICT_MAX_WRITE = (1<<0),

  /* Whether to shutdown the hook when the buffer empties. */
  DEFERRED_WRITE_SHUTDOWN = (1<<1)
};

enum
{
  /* Whether to shutdown the hook when the buffer empties. */
  DEFERRED_READ_SHUTDOWN = (1<<0)
};
void gsk_buffer_stream_mark_deferred_write_shutdown (GskBufferStream *stream);

#define gsk_buffer_stream_has_deferred_write_shutdown(stream)	\
  GSK_HOOK_TEST_USER_FLAG (gsk_buffer_stream_write_hook(stream), DEFERRED_WRITE_SHUTDOWN)
#define gsk_buffer_stream_mark_deferred_write_shutdown(stream)	\
  GSK_HOOK_MARK_USER_FLAG (gsk_buffer_stream_write_hook(stream), DEFERRED_WRITE_SHUTDOWN)
#define gsk_buffer_stream_clear_deferred_write_shutdown(stream)	\
  GSK_HOOK_CLEAR_USER_FLAG (gsk_buffer_stream_write_hook(stream), DEFERRED_WRITE_SHUTDOWN)

#define gsk_buffer_stream_has_deferred_read_shutdown(stream)	\
  GSK_HOOK_TEST_USER_FLAG (gsk_buffer_stream_read_hook(stream), DEFERRED_READ_SHUTDOWN)
#define gsk_buffer_stream_mark_deferred_read_shutdown(stream)	\
  GSK_HOOK_MARK_USER_FLAG (gsk_buffer_stream_read_hook(stream), DEFERRED_READ_SHUTDOWN)
#define gsk_buffer_stream_clear_deferred_read_shutdown(stream)	\
  GSK_HOOK_CLEAR_USER_FLAG (gsk_buffer_stream_read_hook(stream), DEFERRED_READ_SHUTDOWN)

/**
 * gsk_buffer_stream_read_shutdown:
 * @stream: the stream to gracefully shut-down.
 *
 * Shutdown the read-end of the buffer-stream,
 * waiting for the buffer to be drained first.
 */
void gsk_buffer_stream_read_shutdown (GskBufferStream *stream)
{
  if (stream->read_buffer.size == 0)
    gsk_io_notify_read_shutdown (GSK_IO (stream));
  else
    gsk_buffer_stream_mark_deferred_read_shutdown (stream);
}

/* --- GskStream methods --- */
static guint
gsk_buffer_stream_raw_read (GskStream     *stream,
                            gpointer       data,
                            guint          length,
                            GError       **error)
{
  GskBufferStream *bs = GSK_BUFFER_STREAM (stream);
  GskBuffer *buffer = &bs->read_buffer;
  guint rv = gsk_buffer_read (buffer, data, length);
  if (rv > 0)
    gsk_buffer_stream_read_buffer_changed (bs);
  return rv;
}

static guint
gsk_buffer_stream_raw_write (GskStream     *stream,
                             gconstpointer  data,
                             guint          length,
                             GError       **error)
{
  GskBufferStream *bs = GSK_BUFFER_STREAM (stream);
  GskBuffer *buffer = &bs->write_buffer;
  if (gsk_buffer_stream_has_strict_max_write (bs))
    {
      if (buffer->size >= bs->max_write_buffer)
	return 0;
      if (buffer->size + length > bs->max_write_buffer)
	length = bs->max_write_buffer - buffer->size;
    }
  gsk_buffer_append (buffer, data, length);
  if (length > 0)
    gsk_buffer_stream_write_buffer_changed (bs);
  return length;
}

static guint
gsk_buffer_stream_raw_read_buffer (GskStream     *stream,
                                   GskBuffer     *buffer,
                                   GError       **error)
{
  GskBufferStream *bs = GSK_BUFFER_STREAM (stream);
  GskBuffer *read_buffer = &bs->read_buffer;
  guint rv = gsk_buffer_drain (buffer, read_buffer);
  if (rv > 0)
    gsk_buffer_stream_read_buffer_changed (bs);
  return rv;
}

static guint
gsk_buffer_stream_raw_write_buffer (GskStream    *stream,
                                    GskBuffer     *buffer,
                                    GError       **error)
{
  GskBufferStream *bs = GSK_BUFFER_STREAM (stream);
  GskBuffer *write_buffer = &bs->write_buffer;
  guint length = buffer->size;
  guint rv;
  if (gsk_buffer_stream_has_strict_max_write (bs))
    {
      if (buffer->size >= bs->max_write_buffer)
	return 0;
      if (buffer->size + length > bs->max_write_buffer)
	length = bs->max_write_buffer - buffer->size;
      rv = gsk_buffer_transfer (write_buffer, buffer, length);
    }
  else
    {
      rv = gsk_buffer_drain (write_buffer, buffer);
    }
  if (rv > 0)
    gsk_buffer_stream_write_buffer_changed (bs);
  return rv;
}

/* --- GskIO methods --- */
/**
 * gsk_buffer_stream_read_buffer_changed:
 * @stream: stream whose read buffer has been modified.
 *
 * Called to notify the buffer stream that its
 * read-size has been changed, usually because
 * an implementor has appended data into it
 * for the attached stream to read.
 */
void gsk_buffer_stream_read_buffer_changed  (GskBufferStream *stream)
{
  if (stream->read_buffer.size == 0)
    {
      if (gsk_buffer_stream_has_deferred_read_shutdown (stream))
	gsk_io_notify_read_shutdown (stream);
      else
	gsk_io_clear_idle_notify_read (stream);
      gsk_hook_set_idle_notify (gsk_buffer_stream_read_hook (stream),
                                gsk_io_is_polling_for_read (stream));
    }
  else if (gsk_io_get_is_readable (stream))
    {
      gsk_io_mark_idle_notify_read (stream);
    }
}

/**
 * gsk_buffer_stream_write_buffer_changed:
 * @stream: stream whose write buffer has been modified.
 *
 * Called to notify the buffer stream that its
 * write-buffer has been changed, usually because
 * an implementor has read data from it.
 */
void
gsk_buffer_stream_write_buffer_changed (GskBufferStream *stream)
{
  if (stream->write_buffer.size < stream->max_write_buffer)
    gsk_io_mark_idle_notify_write (stream);
  else
    gsk_io_clear_idle_notify_write (stream);

  if (stream->write_buffer.size > 0)
    gsk_hook_mark_idle_notify (gsk_buffer_stream_write_hook (stream));
  else
    {
      gsk_hook_clear_idle_notify (gsk_buffer_stream_write_hook (stream));
      if (gsk_buffer_stream_has_deferred_write_shutdown (stream))
	{
	  gsk_buffer_stream_clear_deferred_write_shutdown (stream);
	  gsk_hook_notify_shutdown (gsk_buffer_stream_write_hook (stream));
	}
    }
}

/**
 * gsk_buffer_stream_changed:
 * @stream: the stream whose internals have been modified.
 *
 * Do all updates needed to compensate for
 * any user changes to: read_buffer, write_buffer,
 * max_write_buffer.
 */
void
gsk_buffer_stream_changed              (GskBufferStream *stream)
{
  gsk_buffer_stream_read_buffer_changed (stream);
  gsk_buffer_stream_write_buffer_changed (stream);
}

static void
gsk_buffer_stream_set_poll_read (GskIO      *io,
                                 gboolean    do_poll)
{
  GskBufferStream *bs = GSK_BUFFER_STREAM (io);
  if (bs->read_buffer.size == 0)
    {
      gsk_hook_set_idle_notify (gsk_buffer_stream_read_hook (bs), do_poll);
    }
  else if (gsk_io_get_is_readable (bs))
    {
      g_return_if_fail (gsk_io_get_idle_notify_read (bs));
    }
}

static void
gsk_buffer_stream_set_poll_write (GskIO      *io,
                                  gboolean    do_poll)
{
  /* Nothing to do.

     All the work is done by idle-notify hooks
     in gsk_buffer_stream_write_buffer_changed() */
}

static gboolean
gsk_buffer_stream_shutdown_read (GskIO      *io,
                                 GError    **error)
{
  gsk_hook_notify_shutdown (gsk_buffer_stream_read_hook (GSK_BUFFER_STREAM (io)));
  return TRUE;
}

static gboolean
gsk_buffer_stream_shutdown_write (GskIO      *io,
                                  GError    **error)
{
  GskBufferStream *bs = GSK_BUFFER_STREAM (io);
  if (bs->write_buffer.size == 0)
    gsk_hook_notify_shutdown (gsk_buffer_stream_write_hook (bs));
  else
    {
      gsk_buffer_stream_mark_deferred_write_shutdown (bs);

      /* postpone shutdown until it happens for real. */
      return FALSE;
    }

  return TRUE;
}

/* --- GObject methods --- */
static void
gsk_buffer_stream_finalize (GObject        *object)
{
  GskBufferStream *bs = GSK_BUFFER_STREAM (object);
  gsk_buffer_destruct (&bs->read_buffer);
  gsk_buffer_destruct (&bs->write_buffer);
  gsk_hook_destruct (&bs->buffered_read_hook);
  gsk_hook_destruct (&bs->buffered_write_hook);
  (*parent_class->finalize) (object);
}

/* --- functions --- */
static void
gsk_buffer_stream_init (GskBufferStream *buffer_stream)
{
  GSK_HOOK_INIT (buffer_stream,
		 GskBufferStream,
		 buffered_read_hook,
		 GSK_HOOK_IS_AVAILABLE,
		 buffered_read_set_poll, buffered_read_shutdown);
  GSK_HOOK_INIT (buffer_stream,
		 GskBufferStream,
		 buffered_write_hook,
		 GSK_HOOK_IS_AVAILABLE,
		 buffered_write_set_poll, buffered_write_shutdown);

  gsk_stream_mark_is_writable (buffer_stream);
  gsk_stream_mark_is_readable (buffer_stream);
}
static void
gsk_buffer_stream_class_init (GskBufferStreamClass *class)
{
  GObjectClass *object_class = G_OBJECT_CLASS (class);
  GskIOClass *io_class = GSK_IO_CLASS (class);
  GskStreamClass *stream_class = GSK_STREAM_CLASS (class);
  parent_class = g_type_class_peek_parent (class);
  object_class->finalize = gsk_buffer_stream_finalize;
  io_class->set_poll_read = gsk_buffer_stream_set_poll_read;
  io_class->set_poll_write = gsk_buffer_stream_set_poll_write;
  io_class->shutdown_read = gsk_buffer_stream_shutdown_read;
  io_class->shutdown_write = gsk_buffer_stream_shutdown_write;
  stream_class->raw_read = gsk_buffer_stream_raw_read;
  stream_class->raw_write = gsk_buffer_stream_raw_write;
  stream_class->raw_read_buffer = gsk_buffer_stream_raw_read_buffer;
  stream_class->raw_write_buffer = gsk_buffer_stream_raw_write_buffer;
  GSK_HOOK_CLASS_INIT (object_class, "buffered-read-hook", GskBufferStream, buffered_read_hook);
  GSK_HOOK_CLASS_INIT (object_class, "buffered-write-hook", GskBufferStream, buffered_write_hook);
}

GType gsk_buffer_stream_get_type()
{
  static GType buffer_stream_type = 0;
  if (!buffer_stream_type)
    {
      static const GTypeInfo buffer_stream_info =
      {
	sizeof(GskBufferStreamClass),
	(GBaseInitFunc) NULL,
	(GBaseFinalizeFunc) NULL,
	(GClassInitFunc) gsk_buffer_stream_class_init,
	NULL,		/* class_finalize */
	NULL,		/* class_data */
	sizeof (GskBufferStream),
	0,		/* n_preallocs */
	(GInstanceInitFunc) gsk_buffer_stream_init,
	NULL		/* value_table */
      };
      buffer_stream_type = g_type_register_static (GSK_TYPE_STREAM,
                                                  "GskBufferStream",
						  &buffer_stream_info, 0);
    }
  return buffer_stream_type;
}

/**
 * gsk_buffer_stream_new:
 *
 * Create a new #GskBufferStream.
 *
 * returns: the newly allocated GskBufferStream.
 */
GskBufferStream *
gsk_buffer_stream_new (void)
{
  return g_object_new (GSK_TYPE_BUFFER_STREAM, NULL);
}


syntax highlighted by Code2HTML, v. 0.9.1