#include "gskerror.h"
#include "gskghelpers.h"
#include "gskmacros.h"
#include "gskstreamtransferrequest.h"

#define DEFAULT_MAX_BUFFERED     4096
#define DEFAULT_MAX_ATOMIC_READ  4096
#define MAX_READ_ON_STACK        8192

static GObjectClass *parent_class = NULL;

static inline void
check_internal_write_block (GskStreamTransferRequest *request, gboolean block)
{
  if (block && !request->blocking_write_side)
    {
      request->blocking_write_side = 1;
      gsk_io_block_write (GSK_IO (request->write_side));
    }
  else if (!block && request->blocking_write_side)
    {
      request->blocking_write_side = 0;
      gsk_io_unblock_write (GSK_IO (request->write_side));
    }
}

static inline void
check_internal_read_block (GskStreamTransferRequest *request, gboolean block)
{
  if (block && !request->blocking_read_side)
    {
      request->blocking_read_side = 1;
      gsk_io_block_read (GSK_IO (request->read_side));
    }
  else if (!block && request->blocking_read_side)
    {
      request->blocking_read_side = 0;
      gsk_io_unblock_read (GSK_IO (request->read_side));
    }
}

static inline void
check_internal_blocks (GskStreamTransferRequest *request)
{
  guint size = request->buffer.size;
  check_internal_read_block (request, size > request->max_buffered);
  check_internal_write_block (request, size == 0);
}

static void
handle_error (GskStreamTransferRequest *self, GError *error)
{
  g_return_if_fail (error);
  g_warning ("GskStreamTransferRequest: %s", error->message);

  if (gsk_request_had_error (self))
    g_free (error);
  else
    {
      g_return_if_fail (gsk_request_get_is_running (self));
      g_return_if_fail (!gsk_request_get_is_done (self));
      g_return_if_fail (!gsk_request_get_is_cancelled (self));

      gsk_request_set_error (self, error);
      gsk_request_done (self);
      gsk_io_read_shutdown (self->read_side, NULL);
      gsk_io_write_shutdown (self->write_side, NULL);
    }
}

static gboolean
handle_input_is_readable (GskIO *io, gpointer user_data)
{
  GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
  GskStream *read_side = self->read_side;
  GskStream *write_side = self->write_side;
  guint atomic_read_size = self->atomic_read_size;
  gboolean read_on_stack = (atomic_read_size > MAX_READ_ON_STACK);
  GError *error = NULL;
  char *buf;
  guint num_read;

  g_return_val_if_fail (read_side == GSK_STREAM (io), FALSE);
  g_return_val_if_fail (write_side, FALSE);

  if (read_on_stack)
    buf = g_alloca (atomic_read_size);
  else
    buf = g_malloc (atomic_read_size);

  num_read = gsk_stream_read (read_side, buf, atomic_read_size, &error);
  if (error)
    {
      handle_error (self, error);
      if (!read_on_stack)
        g_free (buf);
      return FALSE;
    }
  if (num_read == 0)
    {
      if (!read_on_stack)
        g_free (buf);
      /* Wait for shutdown notification, in case there's an error. */
      return TRUE;
    }

  if (read_on_stack)
    {
      guint num_written = 0;
      if (self->buffer.size == 0)
	{
	  num_written = gsk_stream_write (write_side, buf, num_read, &error);
	  if (error)
	    {
	      handle_error (self, error);
	      return FALSE;
	    }
	}
      if (num_written < num_read)
	gsk_buffer_append (&self->buffer,
			   buf + num_written,
			   num_read - num_written);
    }
  else
    gsk_buffer_append_foreign (&self->buffer, buf, num_read, g_free, buf);

  check_internal_blocks (self);
  return TRUE;
}

static gboolean
handle_input_shutdown_read (GskIO *io, gpointer user_data)
{
  GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
  GskStream *write_side = self->write_side;

  g_return_val_if_fail (self->read_side == GSK_STREAM (io), FALSE);

  /* If the write_side has been shut down already, we don't need to do
   * anything.
   */
  if (write_side && gsk_stream_get_is_writable (write_side))
    {
      if (io->error)
	{
	  /* If the stream had an error, it's an error. */
	  handle_error (self, g_error_copy (io->error));
	}
      else if (self->buffer.size == 0)
	{
	  /* Otherwise, if there's nothing buffered, try to close write_side. */
	  GError *error = NULL;
	  if (!gsk_io_write_shutdown (GSK_IO (write_side), &error))
	    handle_error (self, error);
	}
    }
  return FALSE;
}

static void
handle_input_is_readable_destroy (gpointer user_data)
{
  GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
  GskStream *read_side = self->read_side;

  g_return_if_fail (read_side);
  g_return_if_fail (!gsk_stream_get_is_readable (read_side));
  g_object_unref (read_side);
  self->read_side = NULL;
  g_object_unref (self);
}

static gboolean
handle_output_is_writable (GskIO *io, gpointer user_data)
{
  GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
  GskStream *write_side = self->write_side;
  GError *error = NULL;

  g_return_val_if_fail (write_side == GSK_STREAM (io), FALSE);

  if (self->buffer.size > 0)
    {
      gsk_stream_write_buffer (write_side, &self->buffer, &error);
      if (error)
	{
	  handle_error (self, error);
	  return FALSE;
	}
    }
  if ((self->read_side == NULL ||
	!gsk_stream_get_is_readable (self->read_side)) &&
      self->buffer.size == 0)
    {
      if (!gsk_io_write_shutdown (GSK_IO (write_side), &error))
	{
	  handle_error (self, error);
	  return FALSE;
	}
      else
	{
	  /* Wait for shutdown notification. */
	  return TRUE;
	}
    }
  check_internal_blocks (self);
  return TRUE;
}

static gboolean
handle_output_shutdown_write (GskIO *io, gpointer user_data)
{
  GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
  GskStream *read_side = self->read_side;

  g_return_val_if_fail (self->write_side == GSK_STREAM (io), FALSE);

  if (read_side && gsk_stream_get_is_readable (read_side))
    {
      /* If the read side is still readable, it means the write side
       * shut down before we could write everything out, so we don't
       * care about the write side's error status.
       */
      GError *error;
      error = g_error_new (GSK_G_ERROR_DOMAIN,
			   0, /* TODO */
			   "premature shutdown of write stream");
      handle_error (self, error);
      gsk_io_read_shutdown (GSK_IO (self->read_side), NULL);
    }
  else if (io->error)
    {
      /* Otherwise, if the write side had an error, it's an error. */
      handle_error (self, g_error_copy (io->error));
    }
  else
    {
      /* Otherwise, everything worked. */
      gsk_request_done (self);
    }
/* TODO: why would you ever want to return true from the shutdown handler? */
  return FALSE;
}

static void
handle_output_is_writable_destroy (gpointer user_data)
{
  GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
  GskStream *write_side = self->write_side;

  g_return_if_fail (write_side);
  g_return_if_fail (!gsk_io_get_is_writable (write_side));
  self->write_side = NULL;
  g_object_unref (write_side);
  g_object_unref (self);
}

/*
 * GskRequest methods.
 */

static void
gsk_stream_transfer_request_start (GskRequest *request)
{
  GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (request);
  GskStream *read_side = self->read_side;
  GskStream *write_side = self->write_side;

  g_return_if_fail (read_side);
  g_return_if_fail (gsk_stream_get_is_readable (read_side));
  g_return_if_fail (!gsk_io_has_read_hook (read_side));
  g_return_if_fail (write_side);
  g_return_if_fail (gsk_stream_get_is_writable (write_side));
  g_return_if_fail (!gsk_io_has_write_hook (write_side));

  g_object_ref (self);
  g_object_ref (self);
  gsk_request_mark_is_running (self);

  gsk_io_trap_readable (GSK_IO (read_side),
			handle_input_is_readable,
			handle_input_shutdown_read,
			self,
			handle_input_is_readable_destroy);
  gsk_io_trap_writable (GSK_IO (write_side),
			handle_output_is_writable,
			handle_output_shutdown_write,
			self,
			handle_output_is_writable_destroy);
}

void
gsk_stream_transfer_request_cancelled (GskRequest *request)
{
  GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (request);

  if (gsk_request_get_is_running (self))
    {
      GskStream *read_side = self->read_side;
      GskStream *write_side = self->write_side;

/* XXX: should a cancellation shut down the streams, or just untrap them?
 * (Clearly not, if there's an "only transfer n bytes" mode...)
 */
      g_return_if_fail (read_side);
      g_return_if_fail (write_side);
      gsk_io_read_shutdown (read_side, NULL);
      gsk_io_write_shutdown (write_side, NULL);
    }
  gsk_request_mark_is_cancelled (self);
}

/*
 * GObject methods.
 */

static void
gsk_stream_transfer_request_finalize (GObject *object)
{
  GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (object);

  if (self->read_side)
    g_object_unref (self->read_side);
  if (self->write_side)
    g_object_unref (self->write_side);
  gsk_buffer_destruct (&self->buffer);

  (*parent_class->finalize) (object);
}

static void
gsk_stream_transfer_request_init (GskStreamTransferRequest *request)
{
  request->max_buffered = DEFAULT_MAX_BUFFERED;
  request->atomic_read_size = DEFAULT_MAX_ATOMIC_READ;
  gsk_buffer_construct (&request->buffer);
}

static void
gsk_stream_transfer_request_class_init (GskRequestClass *request_class)
{
  parent_class = g_type_class_peek_parent (request_class);
  G_OBJECT_CLASS (request_class)->finalize =
    gsk_stream_transfer_request_finalize;
  request_class->start = gsk_stream_transfer_request_start;
  request_class->cancelled = gsk_stream_transfer_request_cancelled;
}

GType
gsk_stream_transfer_request_get_type (void)
{
  static GType type = 0;
  if (G_UNLIKELY (type == 0))
    {
      static const GTypeInfo type_info =
	{
	  sizeof(GskStreamTransferRequestClass),
	  (GBaseInitFunc) NULL,
	  (GBaseFinalizeFunc) NULL,
	  (GClassInitFunc) gsk_stream_transfer_request_class_init,
	  NULL,		/* class_finalize */
	  NULL,		/* class_data */
	  sizeof (GskStreamTransferRequest),
	  16,		/* n_preallocs */
	  (GInstanceInitFunc) gsk_stream_transfer_request_init,
	  NULL		/* value_table */
	};
      type = g_type_register_static (GSK_TYPE_REQUEST,
				     "GskStreamTransferRequest",
				     &type_info,
				     0);
    }
  return type;
}

/*
 *
 * Public interface.
 *
 */

void
gsk_stream_transfer_request_set_max_buffered
	(GskStreamTransferRequest *request, guint max_buffered)
{
  request->max_buffered = max_buffered;
  check_internal_blocks (request);
}

guint
gsk_stream_transfer_request_get_max_buffered
	(GskStreamTransferRequest *request)
{
  return request->max_buffered;
}

void
gsk_stream_transfer_request_set_atomic_read_size
	(GskStreamTransferRequest *request, guint atomic_read_size)
{
  request->atomic_read_size = atomic_read_size;
}

guint
gsk_stream_transfer_request_get_atomic_read_size
	(GskStreamTransferRequest *request)
{
  return request->atomic_read_size;
}

GskStreamTransferRequest *
gsk_stream_transfer_request_new (GskStream *input_stream,
				 GskStream *output_stream)
{
  GskStreamTransferRequest *request;

  g_return_val_if_fail (input_stream, NULL);
  g_return_val_if_fail (output_stream, NULL);
  g_return_val_if_fail (gsk_stream_get_is_readable (input_stream), NULL);
  g_return_val_if_fail (gsk_stream_get_is_writable (output_stream), NULL);
  g_return_val_if_fail (!gsk_io_has_read_hook (input_stream), NULL);
  g_return_val_if_fail (!gsk_io_has_write_hook (output_stream), NULL);

  request = g_object_new (GSK_TYPE_STREAM_TRANSFER_REQUEST, NULL);
  request->read_side = input_stream;
  g_object_ref (input_stream);
  request->write_side = output_stream;
  g_object_ref (output_stream);
  return request;
}


syntax highlighted by Code2HTML, v. 0.9.1