/* TODO: we should really be using gsk_stream_read_buffer()
and gsk_stream_write_buffer()!!!!!! */
#include "gskstreamconnection.h"
#include "gskerror.h"
#include "gskghelpers.h"
#include "gskmacros.h"
static GObjectClass *parent_class = NULL;
typedef struct _GskStreamConnectionClass GskStreamConnectionClass;
#define GSK_STREAM_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GSK_TYPE_STREAM_CONNECTION, GskStreamConnectionClass))
#define GSK_STREAM_CONNECTION_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), GSK_TYPE_STREAM_CONNECTION, GskStreamConnectionClass))
#define GSK_IS_STREAM_CONNECTION_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), GSK_TYPE_STREAM_CONNECTION))
struct _GskStreamConnectionClass
{
GObjectClass object_class;
};
/*
* strategy:
* - When attaching, trap the relevant read and write hooks.
* - If a buffer underflow occurs (the number of buffered bytes is 0),
* block the readable side of this connection.
* - If a buffer overflow occurs (the number of buffered bytes greater than max_buffered),
* block the writable side of this connection.
*/
#define DEFAULT_MAX_BUFFERED 4096
#define DEFAULT_MAX_ATOMIC_READ 4096
#define MAX_READ_ON_STACK 8192
/* runtime-able? */
#if 0
#define DEBUG(args) g_message args
#else
#define DEBUG(args)
#endif
#define D(object, fctname) DEBUG(("stream-attach: %s[%p]: %s", G_OBJECT_TYPE_NAME (object), object, fctname))
static inline void
stream_connection_set_internal_write_block (GskStreamConnection *stream_connection,
gboolean block)
{
if (stream_connection->write_side == NULL)
return;
if (block && !stream_connection->blocking_write_side)
{
stream_connection->blocking_write_side = 1;
gsk_io_block_write (GSK_IO (stream_connection->write_side));
}
else if (!block && stream_connection->blocking_write_side)
{
stream_connection->blocking_write_side = 0;
gsk_io_unblock_write (GSK_IO (stream_connection->write_side));
}
}
static inline void
stream_connection_set_internal_read_block (GskStreamConnection *stream_connection,
gboolean block)
{
if (stream_connection->read_side == NULL)
return;
if (block && !stream_connection->blocking_read_side)
{
stream_connection->blocking_read_side = 1;
gsk_io_block_read (GSK_IO (stream_connection->read_side));
}
else if (!block && stream_connection->blocking_read_side)
{
stream_connection->blocking_read_side = 0;
gsk_io_unblock_read (GSK_IO (stream_connection->read_side));
}
}
static inline void
check_internal_blocks (GskStreamConnection *stream_connection)
{
guint size = stream_connection->buffer.size;
stream_connection_set_internal_read_block (stream_connection, size > stream_connection->max_buffered);
stream_connection_set_internal_write_block (stream_connection, size == 0);
}
static void
handle_error (GskStreamConnection *stream_connection,
GError *error)
{
gsk_stream_connection_shutdown (stream_connection);
g_warning ("got error: %s", error->message);
g_error_free (error);
}
/**
* gsk_stream_connection_set_max_buffered:
* @connection: the connection to affect.
* @max_buffered: maximum of data to hold from the input stream
* for the output stream. After this much data has built up,
* we will no longer read from the input stream.
*
* Adjust the maximum amount of memory buffer to use between these streams.
*
* Sometimes, we will buffer more data, either because set_max_buffer was run
* to make the amount allowed smaller than the amount currently buffered,
* or because there was a buffer-to-buffer transfer (which are allowed
* to be large).
*/
void gsk_stream_connection_set_max_buffered (GskStreamConnection *connection,
guint max_buffered)
{
connection->max_buffered = max_buffered;
check_internal_blocks (connection);
}
/**
* gsk_stream_connection_get_max_buffered:
* @connection: the connection to query.
*
* Get the maximum number of bytes of data to buffer between the input and
* output ends of the connection.
*
* The actual number of bytes of data can be found with gsk_stream_connection_get_cur_buffered().
*
* returns: the maximum number of bytes.
*/
guint gsk_stream_connection_get_max_buffered (GskStreamConnection *connection)
{
return connection->max_buffered;
}
/**
* gsk_stream_connection_get_cur_buffered:
* @connection: the connection to query.
*
* Get the number of bytes of data currently buffered;
* that is, bytes we have read from the input and not yet written to the
* output.
*
* returns: the current number of bytes.
*/
guint gsk_stream_connection_get_cur_buffered (GskStreamConnection *connection)
{
return connection->buffer.size;
}
/**
* gsk_stream_connection_set_atomic_read_size:
* @connection: the connection to affect.
* @atomic_read_size: the size to read at a time.
*
* Set the number of bytes to read atomically from
* an underlying source. This is only
* used if the input stream has no read_buffer method.
*/
void gsk_stream_connection_set_atomic_read_size(GskStreamConnection *connection,
guint atomic_read_size)
{
connection->atomic_read_size = atomic_read_size;
}
/**
* gsk_stream_connection_get_atomic_read_size:
* @connection: the connection to query.
*
* Set the number of bytes to read atomically from
* an underlying source.
*
* returns: the size to read at a time.
*/
guint
gsk_stream_connection_get_atomic_read_size(GskStreamConnection *connection)
{
return connection->atomic_read_size;
}
static gboolean
handle_input_is_readable (GskIO *io,
gpointer data)
{
char *buf;
GskStreamConnection *stream_connection = data;
GskStream *read_side = stream_connection->read_side;
GskStream *write_side = stream_connection->write_side;
GError *error = NULL;
guint num_read, num_written = 0;
guint atomic_read_size = stream_connection->atomic_read_size;
gboolean must_free_buf = (stream_connection->atomic_read_size > MAX_READ_ON_STACK);
g_return_val_if_fail (read_side == GSK_STREAM (io), FALSE);
g_return_val_if_fail (write_side != NULL, FALSE);
D (io, "handle_input_is_readable");
/* TODO: too harsh a penalty for big atomic reads...
* maybe we should cache a big one.
*/
if (must_free_buf)
buf = g_malloc (atomic_read_size);
else
buf = g_alloca (atomic_read_size);
num_read = gsk_stream_read (read_side, buf, atomic_read_size, &error);
if (error != NULL)
{
handle_error (stream_connection, error);
if (must_free_buf)
g_free (buf);
return TRUE;
}
if (num_read == 0)
{
if (must_free_buf)
g_free (buf);
return TRUE;
}
if (stream_connection->buffer.size == 0)
{
num_written = gsk_stream_write (write_side, buf, num_read, &error);
if (error != NULL)
{
handle_error (stream_connection, error);
if (must_free_buf)
g_free (buf);
return TRUE;
}
}
if (num_written < num_read)
gsk_buffer_append (&stream_connection->buffer,
buf + num_written,
num_read - num_written);
check_internal_blocks (stream_connection);
if (must_free_buf)
g_free (buf);
return TRUE;
}
static gboolean
handle_input_shutdown_read (GskIO *io,
gpointer data)
{
GskStreamConnection *stream_connection = data;
D (io, "handle_input_shutdown_read");
if (stream_connection->write_side != NULL)
{
GError *error = NULL;
if (stream_connection->buffer.size == 0)
{
if (!gsk_io_write_shutdown (GSK_IO (stream_connection->write_side), &error)
&& error != NULL)
{
const char *msg = error->message;
gsk_g_debug ("stream-attach: handle-read-shutdown: doing write-shutdown: %s", msg);
if (error)
g_error_free (error);
}
}
}
return FALSE;
}
static void
handle_input_is_readable_destroy (gpointer data)
{
GskStreamConnection *stream_connection = data;
GskStream *read_side = stream_connection->read_side;
D (read_side, "handle_input_is_readable_destroy");
stream_connection->read_side = NULL;
g_object_unref (stream_connection);
if (read_side != NULL)
g_object_unref (read_side);
}
static gboolean
handle_output_is_writable (GskIO *io,
gpointer data)
{
GskStreamConnection *stream_connection = data;
GskStream *write_side = stream_connection->write_side;
GskStream *read_side = stream_connection->read_side;
GError *error = NULL;
g_return_val_if_fail (write_side == GSK_STREAM (io), FALSE);
/* The read-side of the connection in fact may be shut-down,
but we still flush the outgoing data before shutting down
the write-side. So, this assertion is wrong.
Nonetheless, it's so old that it's unclear how it can be wrong,
so we'll leave it here for further reflection.
(daveb, Mar 8, 2005) */
/*g_return_val_if_fail (read_side != NULL, FALSE);*/
D (write_side, "handle_output_is_writable");
if (stream_connection->buffer.size > 0)
{
gsk_stream_write_buffer (write_side, &stream_connection->buffer, &error);
if (error)
{
handle_error (stream_connection, error);
return TRUE;
}
}
if (stream_connection->buffer.size == 0
&& read_side == NULL)
{
if (!gsk_io_write_shutdown (GSK_IO (stream_connection->write_side), &error)
&& error != NULL)
{
const char *msg = error->message;
gsk_g_debug ("stream-attach: handle-output-is-writable, shutting down write end: %s", msg);
if (error)
g_error_free (error);
}
}
check_internal_blocks (stream_connection);
return TRUE;
}
static gboolean
handle_output_shutdown_write (GskIO *io,
gpointer data)
{
GskStreamConnection *stream_connection = data;
D (stream_connection->write_side, "handle_output_shutdown_write");
if (stream_connection->read_side != NULL)
{
GError *error = NULL;
if (!gsk_io_read_shutdown (GSK_IO (stream_connection->read_side), &error)
&& error != NULL)
{
const char *msg = error->message;
g_error ("stream-attach: handle-write-shutdown: doing read-shutdown: %s", msg);
if (error)
g_error_free (error);
}
}
return FALSE;
}
static void
handle_output_is_writable_destroy (gpointer data)
{
GskStreamConnection *stream_connection = data;
GskStream *write_side = stream_connection->write_side;
D (write_side, "handle_output_is_writable_destroy");
stream_connection->write_side = NULL;
if (stream_connection->read_side != NULL)
gsk_io_untrap_readable (GSK_IO (stream_connection->read_side));
g_object_unref (stream_connection);
if (write_side != NULL)
g_object_unref (write_side);
}
static void
gsk_stream_connection_finalize (GObject *object)
{
GskStreamConnection *connection = GSK_STREAM_CONNECTION (object);
gsk_buffer_destruct (&connection->buffer);
parent_class->finalize (object);
}
static void
gsk_stream_connection_init (GskStreamConnection *stream_connection)
{
stream_connection->max_buffered = DEFAULT_MAX_BUFFERED;
stream_connection->atomic_read_size = DEFAULT_MAX_ATOMIC_READ;
gsk_buffer_construct (&stream_connection->buffer);
}
static void
gsk_stream_connection_class_init (GskStreamConnectionClass *class)
{
G_OBJECT_CLASS (class)->finalize = gsk_stream_connection_finalize;
parent_class = g_type_class_peek_parent (class);
}
GType gsk_stream_connection_get_type()
{
static GType stream_connection_type = 0;
if (!stream_connection_type)
{
static const GTypeInfo stream_connection_info =
{
sizeof(GskStreamConnectionClass),
(GBaseInitFunc) NULL,
(GBaseFinalizeFunc) NULL,
(GClassInitFunc) gsk_stream_connection_class_init,
NULL, /* class_finalize */
NULL, /* class_data */
sizeof (GskStreamConnection),
16, /* n_preallocs */
(GInstanceInitFunc) gsk_stream_connection_init,
NULL /* value_table */
};
stream_connection_type = g_type_register_static (G_TYPE_OBJECT,
"GskStreamConnection",
&stream_connection_info, 0);
}
return stream_connection_type;
}
/**
* gsk_stream_connection_new:
* @input_stream: the input stream whose read-end will be trapped.
* @output_stream: the output stream whose write-end will be trapped.
* @error: optional error return location.
*
* Attach the read end of @input_stream
* to the write end of @output_stream,
* returning an error if anything goes wrong.
*
* returns: a reference at the connection.
* You should use eventually call g_object_unref() on the connection.
*/
GskStreamConnection *
gsk_stream_connection_new (GskStream *input_stream,
GskStream *output_stream,
GError **error)
{
GskStreamConnection *stream_connection;
DEBUG (("stream-attach: attach: input=%s[%p], output=%s[%p]",
G_OBJECT_TYPE_NAME (input_stream), input_stream,
G_OBJECT_TYPE_NAME (output_stream), output_stream));
g_return_val_if_fail (input_stream != NULL, NULL);
g_return_val_if_fail (output_stream != NULL, NULL);
g_return_val_if_fail (gsk_stream_get_is_readable (input_stream), NULL);
g_return_val_if_fail (gsk_stream_get_is_writable (output_stream), NULL);
g_return_val_if_fail (!gsk_io_has_read_hook (input_stream), NULL);
g_return_val_if_fail (!gsk_io_has_write_hook (output_stream), NULL);
if (error && *error)
return NULL;
g_object_ref (input_stream);
g_object_ref (output_stream);
stream_connection = g_object_new (GSK_TYPE_STREAM_CONNECTION, NULL);
g_object_ref (stream_connection);
stream_connection->read_side = input_stream;
g_object_ref (stream_connection);
stream_connection->write_side = output_stream;
gsk_io_trap_readable (GSK_IO (input_stream),
handle_input_is_readable,
handle_input_shutdown_read,
stream_connection,
handle_input_is_readable_destroy);
gsk_io_trap_writable (GSK_IO (output_stream),
handle_output_is_writable,
handle_output_shutdown_write,
stream_connection,
handle_output_is_writable_destroy);
return stream_connection;
}
/**
* gsk_stream_connection_detach:
* @connection: the connection to detach.
*
* Disconnects the input/output pair of a connection.
* Data held in the buffer will be lost.
*/
void
gsk_stream_connection_detach (GskStreamConnection *connection)
{
g_object_ref (connection);
if (connection->read_side)
gsk_stream_untrap_readable (connection->read_side);
if (connection->write_side)
gsk_stream_untrap_writable (connection->write_side);
gsk_buffer_destruct (&connection->buffer);
g_object_unref (connection);
}
/**
* gsk_stream_connection_shutdown:
* @connection: the connection to shut down.
*
* Shut down both ends of a connection.
*/
void
gsk_stream_connection_shutdown (GskStreamConnection *connection)
{
GskStream *read_side = connection->read_side;
GskStream *write_side = connection->write_side;
if (write_side)
g_object_ref (write_side);
if (read_side)
gsk_io_read_shutdown (read_side, NULL);
if (write_side)
{
gsk_io_write_shutdown (write_side, NULL);
g_object_unref (write_side);
}
}
syntax highlighted by Code2HTML, v. 0.9.1