#include "gskerror.h"
#include "gskghelpers.h"
#include "gskmacros.h"
#include "gskstreamtransferrequest.h"
#define DEFAULT_MAX_BUFFERED 4096
#define DEFAULT_MAX_ATOMIC_READ 4096
#define MAX_READ_ON_STACK 8192
static GObjectClass *parent_class = NULL;
static inline void
check_internal_write_block (GskStreamTransferRequest *request, gboolean block)
{
if (block && !request->blocking_write_side)
{
request->blocking_write_side = 1;
gsk_io_block_write (GSK_IO (request->write_side));
}
else if (!block && request->blocking_write_side)
{
request->blocking_write_side = 0;
gsk_io_unblock_write (GSK_IO (request->write_side));
}
}
static inline void
check_internal_read_block (GskStreamTransferRequest *request, gboolean block)
{
if (block && !request->blocking_read_side)
{
request->blocking_read_side = 1;
gsk_io_block_read (GSK_IO (request->read_side));
}
else if (!block && request->blocking_read_side)
{
request->blocking_read_side = 0;
gsk_io_unblock_read (GSK_IO (request->read_side));
}
}
static inline void
check_internal_blocks (GskStreamTransferRequest *request)
{
guint size = request->buffer.size;
check_internal_read_block (request, size > request->max_buffered);
check_internal_write_block (request, size == 0);
}
static void
handle_error (GskStreamTransferRequest *self, GError *error)
{
g_return_if_fail (error);
g_warning ("GskStreamTransferRequest: %s", error->message);
if (gsk_request_had_error (self))
g_free (error);
else
{
g_return_if_fail (gsk_request_get_is_running (self));
g_return_if_fail (!gsk_request_get_is_done (self));
g_return_if_fail (!gsk_request_get_is_cancelled (self));
gsk_request_set_error (self, error);
gsk_request_done (self);
gsk_io_read_shutdown (self->read_side, NULL);
gsk_io_write_shutdown (self->write_side, NULL);
}
}
static gboolean
handle_input_is_readable (GskIO *io, gpointer user_data)
{
GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
GskStream *read_side = self->read_side;
GskStream *write_side = self->write_side;
guint atomic_read_size = self->atomic_read_size;
gboolean read_on_stack = (atomic_read_size > MAX_READ_ON_STACK);
GError *error = NULL;
char *buf;
guint num_read;
g_return_val_if_fail (read_side == GSK_STREAM (io), FALSE);
g_return_val_if_fail (write_side, FALSE);
if (read_on_stack)
buf = g_alloca (atomic_read_size);
else
buf = g_malloc (atomic_read_size);
num_read = gsk_stream_read (read_side, buf, atomic_read_size, &error);
if (error)
{
handle_error (self, error);
if (!read_on_stack)
g_free (buf);
return FALSE;
}
if (num_read == 0)
{
if (!read_on_stack)
g_free (buf);
/* Wait for shutdown notification, in case there's an error. */
return TRUE;
}
if (read_on_stack)
{
guint num_written = 0;
if (self->buffer.size == 0)
{
num_written = gsk_stream_write (write_side, buf, num_read, &error);
if (error)
{
handle_error (self, error);
return FALSE;
}
}
if (num_written < num_read)
gsk_buffer_append (&self->buffer,
buf + num_written,
num_read - num_written);
}
else
gsk_buffer_append_foreign (&self->buffer, buf, num_read, g_free, buf);
check_internal_blocks (self);
return TRUE;
}
static gboolean
handle_input_shutdown_read (GskIO *io, gpointer user_data)
{
GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
GskStream *write_side = self->write_side;
g_return_val_if_fail (self->read_side == GSK_STREAM (io), FALSE);
/* If the write_side has been shut down already, we don't need to do
* anything.
*/
if (write_side && gsk_stream_get_is_writable (write_side))
{
if (io->error)
{
/* If the stream had an error, it's an error. */
handle_error (self, g_error_copy (io->error));
}
else if (self->buffer.size == 0)
{
/* Otherwise, if there's nothing buffered, try to close write_side. */
GError *error = NULL;
if (!gsk_io_write_shutdown (GSK_IO (write_side), &error))
handle_error (self, error);
}
}
return FALSE;
}
static void
handle_input_is_readable_destroy (gpointer user_data)
{
GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
GskStream *read_side = self->read_side;
g_return_if_fail (read_side);
g_return_if_fail (!gsk_stream_get_is_readable (read_side));
g_object_unref (read_side);
self->read_side = NULL;
g_object_unref (self);
}
static gboolean
handle_output_is_writable (GskIO *io, gpointer user_data)
{
GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
GskStream *write_side = self->write_side;
GError *error = NULL;
g_return_val_if_fail (write_side == GSK_STREAM (io), FALSE);
if (self->buffer.size > 0)
{
gsk_stream_write_buffer (write_side, &self->buffer, &error);
if (error)
{
handle_error (self, error);
return FALSE;
}
}
if ((self->read_side == NULL ||
!gsk_stream_get_is_readable (self->read_side)) &&
self->buffer.size == 0)
{
if (!gsk_io_write_shutdown (GSK_IO (write_side), &error))
{
handle_error (self, error);
return FALSE;
}
else
{
/* Wait for shutdown notification. */
return TRUE;
}
}
check_internal_blocks (self);
return TRUE;
}
static gboolean
handle_output_shutdown_write (GskIO *io, gpointer user_data)
{
GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
GskStream *read_side = self->read_side;
g_return_val_if_fail (self->write_side == GSK_STREAM (io), FALSE);
if (read_side && gsk_stream_get_is_readable (read_side))
{
/* If the read side is still readable, it means the write side
* shut down before we could write everything out, so we don't
* care about the write side's error status.
*/
GError *error;
error = g_error_new (GSK_G_ERROR_DOMAIN,
0, /* TODO */
"premature shutdown of write stream");
handle_error (self, error);
gsk_io_read_shutdown (GSK_IO (self->read_side), NULL);
}
else if (io->error)
{
/* Otherwise, if the write side had an error, it's an error. */
handle_error (self, g_error_copy (io->error));
}
else
{
/* Otherwise, everything worked. */
gsk_request_done (self);
}
/* TODO: why would you ever want to return true from the shutdown handler? */
return FALSE;
}
static void
handle_output_is_writable_destroy (gpointer user_data)
{
GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (user_data);
GskStream *write_side = self->write_side;
g_return_if_fail (write_side);
g_return_if_fail (!gsk_io_get_is_writable (write_side));
self->write_side = NULL;
g_object_unref (write_side);
g_object_unref (self);
}
/*
* GskRequest methods.
*/
static void
gsk_stream_transfer_request_start (GskRequest *request)
{
GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (request);
GskStream *read_side = self->read_side;
GskStream *write_side = self->write_side;
g_return_if_fail (read_side);
g_return_if_fail (gsk_stream_get_is_readable (read_side));
g_return_if_fail (!gsk_io_has_read_hook (read_side));
g_return_if_fail (write_side);
g_return_if_fail (gsk_stream_get_is_writable (write_side));
g_return_if_fail (!gsk_io_has_write_hook (write_side));
g_object_ref (self);
g_object_ref (self);
gsk_request_mark_is_running (self);
gsk_io_trap_readable (GSK_IO (read_side),
handle_input_is_readable,
handle_input_shutdown_read,
self,
handle_input_is_readable_destroy);
gsk_io_trap_writable (GSK_IO (write_side),
handle_output_is_writable,
handle_output_shutdown_write,
self,
handle_output_is_writable_destroy);
}
void
gsk_stream_transfer_request_cancelled (GskRequest *request)
{
GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (request);
if (gsk_request_get_is_running (self))
{
GskStream *read_side = self->read_side;
GskStream *write_side = self->write_side;
/* XXX: should a cancellation shut down the streams, or just untrap them?
* (Clearly not, if there's an "only transfer n bytes" mode...)
*/
g_return_if_fail (read_side);
g_return_if_fail (write_side);
gsk_io_read_shutdown (read_side, NULL);
gsk_io_write_shutdown (write_side, NULL);
}
gsk_request_mark_is_cancelled (self);
}
/*
* GObject methods.
*/
static void
gsk_stream_transfer_request_finalize (GObject *object)
{
GskStreamTransferRequest *self = GSK_STREAM_TRANSFER_REQUEST (object);
if (self->read_side)
g_object_unref (self->read_side);
if (self->write_side)
g_object_unref (self->write_side);
gsk_buffer_destruct (&self->buffer);
(*parent_class->finalize) (object);
}
static void
gsk_stream_transfer_request_init (GskStreamTransferRequest *request)
{
request->max_buffered = DEFAULT_MAX_BUFFERED;
request->atomic_read_size = DEFAULT_MAX_ATOMIC_READ;
gsk_buffer_construct (&request->buffer);
}
static void
gsk_stream_transfer_request_class_init (GskRequestClass *request_class)
{
parent_class = g_type_class_peek_parent (request_class);
G_OBJECT_CLASS (request_class)->finalize =
gsk_stream_transfer_request_finalize;
request_class->start = gsk_stream_transfer_request_start;
request_class->cancelled = gsk_stream_transfer_request_cancelled;
}
GType
gsk_stream_transfer_request_get_type (void)
{
static GType type = 0;
if (G_UNLIKELY (type == 0))
{
static const GTypeInfo type_info =
{
sizeof(GskStreamTransferRequestClass),
(GBaseInitFunc) NULL,
(GBaseFinalizeFunc) NULL,
(GClassInitFunc) gsk_stream_transfer_request_class_init,
NULL, /* class_finalize */
NULL, /* class_data */
sizeof (GskStreamTransferRequest),
16, /* n_preallocs */
(GInstanceInitFunc) gsk_stream_transfer_request_init,
NULL /* value_table */
};
type = g_type_register_static (GSK_TYPE_REQUEST,
"GskStreamTransferRequest",
&type_info,
0);
}
return type;
}
/*
*
* Public interface.
*
*/
void
gsk_stream_transfer_request_set_max_buffered
(GskStreamTransferRequest *request, guint max_buffered)
{
request->max_buffered = max_buffered;
check_internal_blocks (request);
}
guint
gsk_stream_transfer_request_get_max_buffered
(GskStreamTransferRequest *request)
{
return request->max_buffered;
}
void
gsk_stream_transfer_request_set_atomic_read_size
(GskStreamTransferRequest *request, guint atomic_read_size)
{
request->atomic_read_size = atomic_read_size;
}
guint
gsk_stream_transfer_request_get_atomic_read_size
(GskStreamTransferRequest *request)
{
return request->atomic_read_size;
}
GskStreamTransferRequest *
gsk_stream_transfer_request_new (GskStream *input_stream,
GskStream *output_stream)
{
GskStreamTransferRequest *request;
g_return_val_if_fail (input_stream, NULL);
g_return_val_if_fail (output_stream, NULL);
g_return_val_if_fail (gsk_stream_get_is_readable (input_stream), NULL);
g_return_val_if_fail (gsk_stream_get_is_writable (output_stream), NULL);
g_return_val_if_fail (!gsk_io_has_read_hook (input_stream), NULL);
g_return_val_if_fail (!gsk_io_has_write_hook (output_stream), NULL);
request = g_object_new (GSK_TYPE_STREAM_TRANSFER_REQUEST, NULL);
request->read_side = input_stream;
g_object_ref (input_stream);
request->write_side = output_stream;
g_object_ref (output_stream);
return request;
}
syntax highlighted by Code2HTML, v. 0.9.1