/* TODO: we should really be using gsk_stream_read_buffer()
   and gsk_stream_write_buffer()!!!!!! */

#include "gskstreamconnection.h"
#include "gskerror.h"
#include "gskghelpers.h"
#include "gskmacros.h"

static GObjectClass *parent_class = NULL;

typedef struct _GskStreamConnectionClass GskStreamConnectionClass;
#define GSK_STREAM_CONNECTION_CLASS(klass)      (G_TYPE_CHECK_CLASS_CAST ((klass), GSK_TYPE_STREAM_CONNECTION, GskStreamConnectionClass))
#define GSK_STREAM_CONNECTION_GET_CLASS(obj)    (G_TYPE_INSTANCE_GET_CLASS ((obj), GSK_TYPE_STREAM_CONNECTION, GskStreamConnectionClass))
#define GSK_IS_STREAM_CONNECTION_CLASS(klass)   (G_TYPE_CHECK_CLASS_TYPE ((klass), GSK_TYPE_STREAM_CONNECTION))

struct _GskStreamConnectionClass 
{
  GObjectClass object_class;
};
/*
 * strategy:
 * - When attaching, trap the relevant read and write hooks.
 * - If a buffer underflow occurs (the number of buffered bytes is 0),
 *   block the readable side of this connection.
 * - If a buffer overflow occurs (the number of buffered bytes greater than max_buffered),
 *   block the writable side of this connection.
 */

#define DEFAULT_MAX_BUFFERED		4096
#define DEFAULT_MAX_ATOMIC_READ		4096
#define MAX_READ_ON_STACK		8192

/* runtime-able? */
#if 0
#define DEBUG(args) g_message args
#else
#define DEBUG(args) 
#endif

#define D(object, fctname) DEBUG(("stream-attach: %s[%p]: %s", G_OBJECT_TYPE_NAME (object), object, fctname))

static inline void
stream_connection_set_internal_write_block (GskStreamConnection *stream_connection,
				      gboolean    block)
{
  if (stream_connection->write_side == NULL)
    return;
  if (block && !stream_connection->blocking_write_side)
    {
      stream_connection->blocking_write_side = 1;
      gsk_io_block_write (GSK_IO (stream_connection->write_side));
    }
  else if (!block && stream_connection->blocking_write_side)
    {
      stream_connection->blocking_write_side = 0;
      gsk_io_unblock_write (GSK_IO (stream_connection->write_side));
    }
}

static inline void
stream_connection_set_internal_read_block (GskStreamConnection *stream_connection,
				      gboolean    block)
{
  if (stream_connection->read_side == NULL)
    return;
  if (block && !stream_connection->blocking_read_side)
    {
      stream_connection->blocking_read_side = 1;
      gsk_io_block_read (GSK_IO (stream_connection->read_side));
    }
  else if (!block && stream_connection->blocking_read_side)
    {
      stream_connection->blocking_read_side = 0;
      gsk_io_unblock_read (GSK_IO (stream_connection->read_side));
    }
}
static inline void
check_internal_blocks (GskStreamConnection *stream_connection)
{
  guint size = stream_connection->buffer.size;
  stream_connection_set_internal_read_block (stream_connection, size > stream_connection->max_buffered);
  stream_connection_set_internal_write_block (stream_connection, size == 0);
}

static void
handle_error (GskStreamConnection *stream_connection,
	      GError     *error)
{
  gsk_stream_connection_shutdown (stream_connection);
  g_warning ("got error: %s", error->message);
  g_error_free (error);
}

/**
 * gsk_stream_connection_set_max_buffered:
 * @connection: the connection to affect.
 * @max_buffered: maximum of data to hold from the input stream
 * for the output stream.  After this much data has built up,
 * we will no longer read from the input stream.
 *
 * Adjust the maximum amount of memory buffer to use between these streams.
 *
 * Sometimes, we will buffer more data, either because set_max_buffer was run
 * to make the amount allowed smaller than the amount currently buffered,
 * or because there was a buffer-to-buffer transfer (which are allowed
 * to be large).
 */
void     gsk_stream_connection_set_max_buffered   (GskStreamConnection *connection,
				                   guint                max_buffered)
{
  connection->max_buffered = max_buffered;
  check_internal_blocks (connection);
}

/**
 * gsk_stream_connection_get_max_buffered:
 * @connection: the connection to query.
 *
 * Get the maximum number of bytes of data to buffer between the input and
 * output ends of the connection.
 *
 * The actual number of bytes of data can be found with gsk_stream_connection_get_cur_buffered().
 *
 * returns: the maximum number of bytes.
 */
guint    gsk_stream_connection_get_max_buffered   (GskStreamConnection *connection)
{
  return connection->max_buffered;
}

/**
 * gsk_stream_connection_get_cur_buffered:
 * @connection: the connection to query.
 *
 * Get the number of bytes of data currently buffered; 
 * that is, bytes we have read from the input and not yet written to the
 * output.
 *
 * returns: the current number of bytes.
 */
guint    gsk_stream_connection_get_cur_buffered   (GskStreamConnection *connection)
{
  return connection->buffer.size;
}


/**
 * gsk_stream_connection_set_atomic_read_size:
 * @connection: the connection to affect.
 * @atomic_read_size: the size to read at a time.
 *
 * Set the number of bytes to read atomically from
 * an underlying source.  This is only
 * used if the input stream has no read_buffer method.
 */
void     gsk_stream_connection_set_atomic_read_size(GskStreamConnection *connection,
				                   guint                atomic_read_size)
{
  connection->atomic_read_size = atomic_read_size;
}

/**
 * gsk_stream_connection_get_atomic_read_size:
 * @connection: the connection to query.
 *
 * Set the number of bytes to read atomically from
 * an underlying source.
 *
 * returns: the size to read at a time.
 */
guint
gsk_stream_connection_get_atomic_read_size(GskStreamConnection *connection)
{
  return connection->atomic_read_size;
}

static gboolean
handle_input_is_readable (GskIO         *io,
			  gpointer       data)
{
  char *buf;
  GskStreamConnection *stream_connection = data;
  GskStream *read_side = stream_connection->read_side;
  GskStream *write_side = stream_connection->write_side;
  GError *error = NULL;
  guint num_read, num_written = 0;
  guint atomic_read_size = stream_connection->atomic_read_size;
  gboolean must_free_buf = (stream_connection->atomic_read_size > MAX_READ_ON_STACK);
  g_return_val_if_fail (read_side == GSK_STREAM (io), FALSE);
  g_return_val_if_fail (write_side != NULL, FALSE);

  D (io, "handle_input_is_readable");

  /* TODO: too harsh a penalty for big atomic reads...
   * maybe we should cache a big one.
   */
  if (must_free_buf)
    buf = g_malloc (atomic_read_size);
  else
    buf = g_alloca (atomic_read_size);

  num_read = gsk_stream_read (read_side, buf, atomic_read_size, &error);
  if (error != NULL)
    {
      handle_error (stream_connection, error);
      if (must_free_buf)
        g_free (buf);
      return TRUE;
    }
  if (num_read == 0)
    {
      if (must_free_buf)
        g_free (buf);
      return TRUE;
    }
  if (stream_connection->buffer.size == 0)
    {
      num_written = gsk_stream_write (write_side, buf, num_read, &error);
      if (error != NULL)
	{
	  handle_error (stream_connection, error);
          if (must_free_buf)
            g_free (buf);
	  return TRUE;
	}
    }
  if (num_written < num_read)
    gsk_buffer_append (&stream_connection->buffer,
		       buf + num_written,
		       num_read - num_written);

  check_internal_blocks (stream_connection);

  if (must_free_buf)
    g_free (buf);

  return TRUE;
}

static gboolean
handle_input_shutdown_read (GskIO     *io,
			    gpointer   data)
{
  GskStreamConnection *stream_connection = data;
  D (io, "handle_input_shutdown_read");
  if (stream_connection->write_side != NULL)
    {
      GError *error = NULL;
      if (stream_connection->buffer.size == 0)
	{
	  if (!gsk_io_write_shutdown (GSK_IO (stream_connection->write_side), &error)
	      && error != NULL)
	    {
	      const char *msg = error->message;
	      gsk_g_debug ("stream-attach: handle-read-shutdown: doing write-shutdown: %s", msg);
	      if (error)
		g_error_free (error);
	    }
	}
    }
  return FALSE;
}

static void
handle_input_is_readable_destroy (gpointer data)
{
  GskStreamConnection *stream_connection = data;
  GskStream *read_side = stream_connection->read_side;
  D (read_side, "handle_input_is_readable_destroy");
  stream_connection->read_side = NULL;
  g_object_unref (stream_connection);
  if (read_side != NULL)
    g_object_unref (read_side);
}

static gboolean
handle_output_is_writable (GskIO         *io,
			   gpointer       data)
{
  GskStreamConnection *stream_connection = data;
  GskStream *write_side = stream_connection->write_side;
  GskStream *read_side = stream_connection->read_side;
  GError *error = NULL;
  g_return_val_if_fail (write_side == GSK_STREAM (io), FALSE);

  /* The read-side of the connection in fact may be shut-down,
     but we still flush the outgoing data before shutting down
     the write-side.  So, this assertion is wrong.
     Nonetheless, it's so old that it's unclear how it can be wrong,
     so we'll leave it here for further reflection.
     (daveb, Mar 8, 2005) */
  /*g_return_val_if_fail (read_side != NULL, FALSE);*/

  D (write_side, "handle_output_is_writable");

  if (stream_connection->buffer.size > 0)
    {
      gsk_stream_write_buffer (write_side, &stream_connection->buffer, &error);
      if (error)
	{
	  handle_error (stream_connection, error);
	  return TRUE;
	}
    }
  if (stream_connection->buffer.size == 0
   && read_side == NULL)
    {
      if (!gsk_io_write_shutdown (GSK_IO (stream_connection->write_side), &error)
	  && error != NULL)
	{
	  const char *msg = error->message;
	  gsk_g_debug ("stream-attach: handle-output-is-writable, shutting down write end: %s", msg);
	  if (error)
	    g_error_free (error);
	}
    }

  check_internal_blocks (stream_connection);

  return TRUE;
}

static gboolean
handle_output_shutdown_write (GskIO     *io,
			      gpointer   data)
{
  GskStreamConnection *stream_connection = data;
  D (stream_connection->write_side, "handle_output_shutdown_write");
  if (stream_connection->read_side != NULL)
    {
      GError *error = NULL;
      if (!gsk_io_read_shutdown (GSK_IO (stream_connection->read_side), &error)
	  && error != NULL)
        {
	  const char *msg = error->message;
	  g_error ("stream-attach: handle-write-shutdown: doing read-shutdown: %s", msg);
	  if (error)
	    g_error_free (error);
	}
    }
  return FALSE;
}

static void
handle_output_is_writable_destroy (gpointer data)
{
  GskStreamConnection *stream_connection = data;
  GskStream *write_side = stream_connection->write_side;
  D (write_side, "handle_output_is_writable_destroy");
  stream_connection->write_side = NULL;
  if (stream_connection->read_side != NULL)
    gsk_io_untrap_readable (GSK_IO (stream_connection->read_side));

  g_object_unref (stream_connection);

  if (write_side != NULL)
    g_object_unref (write_side);
}

static void
gsk_stream_connection_finalize (GObject *object)
{
  GskStreamConnection *connection = GSK_STREAM_CONNECTION (object);
  gsk_buffer_destruct (&connection->buffer);
  parent_class->finalize (object);
}

static void
gsk_stream_connection_init (GskStreamConnection *stream_connection)
{
  stream_connection->max_buffered = DEFAULT_MAX_BUFFERED;
  stream_connection->atomic_read_size = DEFAULT_MAX_ATOMIC_READ;
  gsk_buffer_construct (&stream_connection->buffer);
}

static void
gsk_stream_connection_class_init (GskStreamConnectionClass *class)
{
  G_OBJECT_CLASS (class)->finalize = gsk_stream_connection_finalize;
  parent_class = g_type_class_peek_parent (class);
}

GType gsk_stream_connection_get_type()
{
  static GType stream_connection_type = 0;
  if (!stream_connection_type)
    {
      static const GTypeInfo stream_connection_info =
      {
	sizeof(GskStreamConnectionClass),
	(GBaseInitFunc) NULL,
	(GBaseFinalizeFunc) NULL,
	(GClassInitFunc) gsk_stream_connection_class_init,
	NULL,		/* class_finalize */
	NULL,		/* class_data */
	sizeof (GskStreamConnection),
	16,		/* n_preallocs */
	(GInstanceInitFunc) gsk_stream_connection_init,
	NULL		/* value_table */
      };
      stream_connection_type = g_type_register_static (G_TYPE_OBJECT,
                                                  "GskStreamConnection",
						  &stream_connection_info, 0);
    }
  return stream_connection_type;
}

/**
 * gsk_stream_connection_new:
 * @input_stream: the input stream whose read-end will be trapped.
 * @output_stream: the output stream whose write-end will be trapped.
 * @error: optional error return location.
 *
 * Attach the read end of @input_stream
 * to the write end of @output_stream,
 * returning an error if anything goes wrong.
 *
 * returns: a reference at the connection.
 * You should use eventually call g_object_unref() on the connection.
 */
GskStreamConnection *
gsk_stream_connection_new   (GskStream        *input_stream,
			     GskStream        *output_stream,
			     GError          **error)
{
  GskStreamConnection *stream_connection;

  DEBUG (("stream-attach: attach: input=%s[%p], output=%s[%p]",
          G_OBJECT_TYPE_NAME (input_stream), input_stream,
          G_OBJECT_TYPE_NAME (output_stream), output_stream));
  
  g_return_val_if_fail (input_stream != NULL, NULL);
  g_return_val_if_fail (output_stream != NULL, NULL);
  g_return_val_if_fail (gsk_stream_get_is_readable (input_stream), NULL);
  g_return_val_if_fail (gsk_stream_get_is_writable (output_stream), NULL);
  g_return_val_if_fail (!gsk_io_has_read_hook (input_stream), NULL);
  g_return_val_if_fail (!gsk_io_has_write_hook (output_stream), NULL);

  if (error && *error)
    return NULL;


  g_object_ref (input_stream);
  g_object_ref (output_stream);

  stream_connection = g_object_new (GSK_TYPE_STREAM_CONNECTION, NULL);
  g_object_ref (stream_connection);
  stream_connection->read_side = input_stream;
  g_object_ref (stream_connection);
  stream_connection->write_side = output_stream;
  gsk_io_trap_readable (GSK_IO (input_stream),
			handle_input_is_readable,
			handle_input_shutdown_read,
			stream_connection,
			handle_input_is_readable_destroy);
  gsk_io_trap_writable (GSK_IO (output_stream),
			handle_output_is_writable,
			handle_output_shutdown_write,
			stream_connection,
			handle_output_is_writable_destroy);

  return stream_connection;
}

/**
 * gsk_stream_connection_detach:
 * @connection: the connection to detach.
 *
 * Disconnects the input/output pair of a connection.
 * Data held in the buffer will be lost.
 */
void
gsk_stream_connection_detach (GskStreamConnection *connection)
{
  g_object_ref (connection);

  if (connection->read_side)
    gsk_stream_untrap_readable (connection->read_side);
  
  if (connection->write_side)
    gsk_stream_untrap_writable (connection->write_side);

  gsk_buffer_destruct (&connection->buffer);

  g_object_unref (connection);
}

/**
 * gsk_stream_connection_shutdown:
 * @connection: the connection to shut down.
 *
 * Shut down both ends of a connection.
 */
void
gsk_stream_connection_shutdown (GskStreamConnection *connection)
{
  GskStream *read_side = connection->read_side;
  GskStream *write_side = connection->write_side;
  if (write_side)
    g_object_ref (write_side);
  if (read_side)
    gsk_io_read_shutdown (read_side, NULL);
  if (write_side)
    {
      gsk_io_write_shutdown (write_side, NULL);
      g_object_unref (write_side);
    }
}


syntax highlighted by Code2HTML, v. 0.9.1