/*-2 2002/05/12 09:53:14 2002/07/15 20:07:379 2002/09/28 18:11:28
* $Id: rr-channel.c,v 1.43 2002/11/19 21:48:01 jonas Exp $
*
* See the file LICENSE for redistribution information.
* If you have not received a copy of the license, please contact CodeFactory
* by email at info@codefactory.se, or on the web at http://www.codefactory.se/
* You may also write to: CodeFactory AB, SE-903 47, Umeå, Sweden.
*
* Copyright (c) 2002 Jonas Borgström <jonas@codefactory.se>
* Copyright (c) 2002 Daniel Lundin <daniel@codefactory.se>
* Copyright (c) 2002 CodeFactory AB. All rights reserved.
*/
#include <librr/rr.h>
#define RR_TCP_DEFAULT_WINDOW_SIZE 4192 /* FIXME: Shouldn't be here */
static GObjectClass *parent_class = NULL;
static void finalize (GObject *object);
typedef struct _QueueItem QueueItem;
struct _QueueItem {
gint32 msgno;
gboolean reply;
GQueue *queue;
};
static void out_queue_insert_rpy_slot (GSList **queue, gint32 msgno);
static void out_queue_free (GSList *queue);
static void out_queue_push (GSList **queue,
GObject *obj,
gint32 msgno,gint32 channel_id,
RRFrameType type);
static GObject *out_queue_peek_item (GSList *queue);
static GObject *out_queue_pop (GSList **queue);
static gboolean out_queue_ready (GSList *queue);
static void out_queue_optimize (GSList **queue);
static void queue_item_free (QueueItem *item);
static void
rr_channel_init (GObject *object)
{
RRChannel *channel = (RRChannel *)object;
channel->out_lock = g_mutex_new ();
channel->out_cond = g_cond_new ();
channel->out_queue = NULL;
channel->window_size = RR_TCP_DEFAULT_WINDOW_SIZE;
channel->window_in = RR_TCP_DEFAULT_WINDOW_SIZE;
channel->window_out = RR_TCP_DEFAULT_WINDOW_SIZE;
channel->mutex = g_mutex_new ();
channel->aggregate_frames = TRUE;
}
static void
rr_channel_class_init (GObjectClass *klass)
{
klass->finalize = finalize;
parent_class = g_type_class_peek_parent (klass);
}
GType
rr_channel_get_type (void)
{
static GType rr_type = 0;
if (!rr_type) {
static GTypeInfo type_info = {
sizeof (RRChannelClass),
NULL,
NULL,
(GClassInitFunc) rr_channel_class_init,
NULL,
NULL,
sizeof (RRChannel),
16,
(GInstanceInitFunc) rr_channel_init
};
rr_type = g_type_register_static (G_TYPE_OBJECT, "RRChannel",
&type_info, G_TYPE_FLAG_ABSTRACT);
}
return rr_type;
}
static void
finalize (GObject *object)
{
RRChannel *channel = (RRChannel *)object;
rr_main_work_pool_join (RRWPGROUP (object));
/* Outgoing msg queue */
g_mutex_lock (channel->out_lock);
out_queue_free (channel->out_queue);
g_mutex_unlock (channel->out_lock);
g_mutex_free (channel->out_lock);
g_cond_free (channel->out_cond);
g_mutex_free (channel->mutex);
parent_class->finalize (object);
}
void
rr_channel_set_connection (RRChannel *channel, RRConnection *connection)
{
g_return_if_fail (channel != NULL);
g_return_if_fail (RR_IS_CHANNEL (channel));
channel->connection = connection;
}
RRConnection *
rr_channel_get_connection (RRChannel *channel)
{
g_return_val_if_fail (channel != NULL, NULL);
g_return_val_if_fail (RR_IS_CHANNEL (channel), NULL);
return channel->connection;
}
/**
* rr_channel_out_queue_empty:
* @channel: A #RRChannel
*
* Checks if the channels has anything (frame or message) to send.
*
* Return value: TRUE if the out queue is empty, else FALSE.
**/
gboolean
rr_channel_out_queue_empty (RRChannel *channel)
{
gboolean empty;
g_mutex_lock (channel->out_lock);
empty = !out_queue_ready (channel->out_queue);
g_mutex_unlock (channel->out_lock);
return empty;
}
GObject *
rr_channel_get_active_item (RRChannel *channel)
{
GObject *item;
g_mutex_lock (channel->out_lock);
item = out_queue_peek_item (channel->out_queue);
g_mutex_unlock (channel->out_lock);
return item;
}
gboolean
rr_channel_remove_active_item (RRChannel *channel)
{
GObject *item;
gboolean empty;
g_mutex_lock (channel->out_lock);
item = out_queue_pop (&channel->out_queue);
empty = !out_queue_ready (channel->out_queue);
g_cond_broadcast (channel->out_cond);
g_mutex_unlock (channel->out_lock);
return empty;
}
static gboolean
send_helper (RRChannel *channel, GObject *object, GError **error)
{
if (channel->connection == NULL) {
g_set_error (error,
RR_ERROR, /* error domain */
RR_ERROR_DISCONNECTED,/* error code */
"Channel not associated with any connection.");
return FALSE;
}
if (RR_IS_MESSAGE (object))
rr_message_set_channel (RR_MESSAGE (object), channel);
else
RR_FRAME (object)->channel_id = channel->id;
g_mutex_lock (channel->out_lock);
if (RR_IS_MESSAGE (object)) {
RRMessage *message = RR_MESSAGE (object);
if (message->msgno < 0) {
if (message->type == RR_FRAME_TYPE_MSG) {
message->msgno = channel->msgno++;
if (channel->msgno < 0)
channel->msgno = 0;
}
/* else { */
/* g_set_error (error, */
/* RR_ERROR, RR_BEEP_CODE_NO_ACTION, */
/* "msg->msgno has to be set on " */
/* "message replies."); */
/* return FALSE; */
/* } */
}
out_queue_push (&channel->out_queue, object,
message->msgno, channel->id,
message->type);
}
else {
RRFrame *frame = RR_FRAME (object);
if (frame->msgno < 0) {
frame->msgno = frame->msgno++;
if (channel->msgno < 0)
channel->msgno = 0;
}
out_queue_push (&channel->out_queue, object,
frame->msgno, channel->id,
frame->type);
}
g_mutex_unlock (channel->out_lock);
rr_connection_register_sender (channel->connection, channel);
return TRUE;
}
/**
* rr_channel_send_message:
* @channel: a #RRChannel
* @message: a #RRMessage
* @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR.
*
* Enqueue @message for transmission on @channel.
* Note: This function don't block until the message is sent. And @message
* will be unref:ed after transmission.
*
* Return value: %TRUE on success, %FALSE on failure.
**/
gboolean
rr_channel_send_message (RRChannel *channel, RRMessage *message,
GError **error)
{
g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE);
g_return_val_if_fail (RR_IS_MESSAGE (message), FALSE);
return send_helper (channel, G_OBJECT (message), error);
}
/**
* rr_channel_send_frame:
* @channel: a #RRChannel
* @frame: a #RRFrame
* @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR.
*
* Enqueue @frame for transmission on @channel.
* Note: This function don't block until the frame is sent. And @frame
* will be unref:ed after transmission.
*
* Return value: %TRUE on success, %FALSE on failure.
**/
gboolean
rr_channel_send_frame (RRChannel *channel, RRFrame *frame,
GError **error)
{
g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE);
g_return_val_if_fail (RR_IS_FRAME (frame), FALSE);
return send_helper (channel, G_OBJECT (frame), error);
}
void
rr_channel_close_confirmation (RRChannel *channel, gint code,
const gchar *xml_lang, const gchar *diagnostic)
{
rr_debug2 ("channel::close_confirmation "
"%s id=%d code=%d diag='%s'",
G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
channel->id, code,
diagnostic ? diagnostic : "");
if (RR_CHANNEL_GET_CLASS(channel)->close_confirmation)
RR_CHANNEL_GET_CLASS(channel)->close_confirmation (channel,
code,
xml_lang,
diagnostic);
}
gboolean
rr_channel_close_indication (RRChannel *channel, gint code,
const gchar *xml_lang, const gchar *diagnostic,
GError **error)
{
rr_debug2 ("channel::close_indication "
"%s id=%d code=%d diag='%s'",
G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
channel->id, code,
diagnostic ? diagnostic : "");
if (RR_CHANNEL_GET_CLASS(channel)->close_indication)
return RR_CHANNEL_GET_CLASS(channel)->close_indication (channel,
code,
xml_lang,
diagnostic,
error);
else
return TRUE;
}
static void
do_frame_available (gpointer data, gpointer user_data)
{
RRChannel *channel = data;
RRFrame *frame = user_data;
GError *error = NULL;
gboolean ret;
rr_debug2 ("channel::frame_available %s %s %d %d %s %d %d",
G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
(frame->type == RR_FRAME_TYPE_MSG ? "MSG" :
(frame->type == RR_FRAME_TYPE_RPY ? "RPY" :
(frame->type == RR_FRAME_TYPE_ERR ? "ERR" :
(frame->type == RR_FRAME_TYPE_ANS ? "ANS" :
(frame->type == RR_FRAME_TYPE_NUL ? "NUL" :
"???"))))),
channel->id, frame->msgno, (frame->more?"*":"."),
frame->seqno, frame->size);
if (RR_CHANNEL_GET_CLASS(channel)->frame_available == NULL) {
g_warning ("missing frame_available handler");
g_object_unref (G_OBJECT (frame));
return;
}
if (channel->aggregate_frames) {
RRFrame *new_frame;
new_frame = rr_frame_aggregate (&channel->frame_list, frame);
if (new_frame) {
g_object_unref (G_OBJECT (frame));
frame = new_frame;
}
else {
g_object_unref (G_OBJECT (frame));
return;
}
}
/* Reserv a place in the queue, so we are sure the replies get sent
* in the correct order */
if (frame->type == RR_FRAME_TYPE_MSG) {
g_mutex_lock (channel->out_lock);
out_queue_insert_rpy_slot (&channel->out_queue, frame->msgno);
g_mutex_unlock (channel->out_lock);
}
ret = RR_CHANNEL_GET_CLASS(channel)->frame_available (channel, frame,
&error);
if (ret == FALSE) {
rr_debug1 ("channel::frame_available failed: %s\n",
error ? error->message : "");
if (error && frame->type == RR_FRAME_TYPE_MSG) {
RRMessageError *err;
/* FIXME: we should only send an error frame if an reply
for the given frame haven't been sent */
err = rr_message_error_new_from_gerror (error, NULL);
RR_MESSAGE (err)->msgno = frame->msgno;
rr_channel_send_message (channel, RR_MESSAGE (err), NULL);
}
if (error)
g_error_free (error);
}
g_object_unref (G_OBJECT (frame));
}
void
rr_channel_frame_available (RRChannel *channel, RRFrame *frame)
{
g_object_ref (G_OBJECT (frame));
if (channel->dont_queue_frames)
do_frame_available (channel, frame);
else
rr_main_work_pool_push (RRWPGROUP (channel),
do_frame_available, channel, frame);
}
gboolean
rr_channel_client_init (RRChannel *channel, GError **error)
{
rr_debug1 ("channel::client_init "
"%s id=%d", G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
channel->id);
if (RR_CHANNEL_GET_CLASS (channel)->client_init)
return RR_CHANNEL_GET_CLASS (channel)->client_init (channel,
error);
else
return TRUE;
}
gboolean
rr_channel_server_init (RRChannel *channel, const gchar *piggyback,
GError **error)
{
rr_debug1 ("channel::server_init "
"%s id=%d piggyback=%s",
G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
channel->id,
piggyback ? piggyback : "None");
if (RR_CHANNEL_GET_CLASS (channel)->server_init)
return RR_CHANNEL_GET_CLASS (channel)->server_init (channel,
piggyback,
error);
else
return TRUE;
}
void
rr_channel_client_confirmation (RRChannel *channel, const gchar *piggyback)
{
rr_debug1 ("channel::client_confirmation "
"%s id=%d piggyback=%s", G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
channel->id,
piggyback ? piggyback : "None");
if (RR_CHANNEL_GET_CLASS (channel)->client_confirmation)
RR_CHANNEL_GET_CLASS (channel)->client_confirmation (channel,
piggyback);
}
void
rr_channel_server_confirmation (RRChannel *channel)
{
rr_debug1 ("channel::server_confirmation "
"%s id=%d", G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
channel->id);
if (RR_CHANNEL_GET_CLASS (channel)->server_confirmation)
RR_CHANNEL_GET_CLASS (channel)->server_confirmation (channel);
}
void
rr_channel_register_frame (RRChannel *channel, RRFrame *frame)
{
g_return_if_fail (RR_IS_CHANNEL (channel));
g_return_if_fail (RR_IS_FRAME (frame));
frame->seqno = channel->seq_out;
channel->seq_out += frame->size;
channel->window_out -= frame->size;
g_return_if_fail (channel->window_in >= 0);
}
/**
* rr_channel_flush:
* @channel: A #RRChannel
* @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR.
*
* Blocks until all outgoing frames/messages are sent.
*
* Return value: %TRUE on success, %FALSE on failure.
**/
gboolean
rr_channel_flush (RRChannel *channel, GError **error)
{
g_mutex_lock (channel->out_lock);
/* FIXME: Add disconnect () detection */
/* while (channel->out_queue->length != 0) */
/* g_cond_wait (channel->out_cond, channel->out_lock); */
while (out_queue_ready (channel->out_queue))
g_cond_wait (channel->out_cond, channel->out_lock);
g_mutex_unlock (channel->out_lock);
return TRUE;
}
/**
* rr_channel_set_window_size:
* @channel: A #RRChannel.
* @size: The new size of the receive window for @channel.
*
* Sets the size of the receive window.
**/
void
rr_channel_set_window_size (RRChannel *channel, gint size)
{
g_return_if_fail (RR_IS_CHANNEL (channel));
g_return_if_fail (size >= 0);
channel->window_size = size;
}
/**
* rr_channel_get_window_size:
* @channel: A #RRChannel.
*
* Returns the receive window size.
*
* Return value: the window size.
**/
gint
rr_channel_get_window_size (RRChannel *channel)
{
g_return_val_if_fail (RR_IS_CHANNEL (channel), -1);
return channel->window_size;
}
/**
* rr_channel_set_aggregate:
* @channel: a #RRChannel
* @aggregate: %TRUE or %FALSE.
*
* Selects if the frames should be aggregated or not.
**/
void
rr_channel_set_aggregate (RRChannel *channel, gboolean aggregate)
{
g_return_if_fail (RR_IS_CHANNEL (channel));
channel->aggregate_frames = aggregate;
}
/**
* rr_channel_get_aggregate:
* @channel: a #RRChannel
*
* Returns %TRUE if the frames are aggregated or %FALSE if they are
* passed to #rr_channel_frame_available directly.
*
* Return value: %TRUE or %FALSE.
**/
gboolean
rr_channel_get_aggregate (RRChannel *channel)
{
g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE);
return channel->aggregate_frames;
}
/**
* rr_channel_get_uri:
* @type: A GType
*
* Get the URI the profile is associated with.
*
* Return value: A uri or %NULL
**/
const gchar *
rr_channel_get_uri (GType type)
{
return g_type_get_qdata (type, g_quark_from_string (RR_CHANNEL_GTYPE_KEY_URI));
}
/**
* rr_channel_set_uri:
* @type: A GType
* @uri: An URI string.
*
* Associate an URI with the profile.
**/
void
rr_channel_set_uri (GType type, const gchar *uri)
{
gchar *old;
g_assert (g_type_is_a (type, RR_TYPE_CHANNEL));
old = g_type_get_qdata (type,
g_quark_from_string (RR_CHANNEL_GTYPE_KEY_URI));
if (old)
g_free (old);
g_type_set_qdata (type,
g_quark_from_string (RR_CHANNEL_GTYPE_KEY_URI),
g_strdup (uri));
}
const gchar *
rr_channel_get_piggyback (RRChannel *channel)
{
g_return_val_if_fail (RR_IS_CHANNEL (channel), NULL);
return channel->piggyback;
}
void
rr_channel_set_piggyback (RRChannel *channel, const gchar *piggyback)
{
g_return_if_fail (RR_IS_CHANNEL (channel));
if (channel->piggyback)
g_free (channel->piggyback);
if (piggyback)
channel->piggyback = g_strdup (piggyback);
else
channel->piggyback = NULL;
}
gboolean
rr_channel_close (RRChannel *channel,
gint code, const gchar *xml_lang,
const gchar *diagnostic,
GError **error)
{
RRConnection *conn;
RRManager *manager;
conn = rr_channel_get_connection (channel);
g_return_val_if_fail (conn, FALSE);
manager = rr_connection_get_manager (conn);
g_return_val_if_fail (manager, FALSE);
return rr_manager_close_channel (manager, channel, code, xml_lang,
diagnostic, error);
}
void
rr_channel_lock (RRChannel *channel)
{
g_return_if_fail (RR_IS_CHANNEL (channel));
g_mutex_lock (channel->mutex);
}
void
rr_channel_unlock (RRChannel *channel)
{
g_return_if_fail (RR_IS_CHANNEL (channel));
g_mutex_unlock (channel->mutex);
}
static void
out_queue_free (GSList *queue)
{
g_slist_foreach (queue, (GFunc)queue_item_free, NULL);
g_slist_free (queue);
}
static QueueItem *
find_queue_item (GSList *queue, gint32 msgno, gboolean reply)
{
GSList *iter;
QueueItem *item;
/* See if we already have an entry in the queue */
iter = queue;
while (iter) {
item = iter->data;
if (item->reply == reply && item->msgno == msgno) {
return item;
}
iter = iter->next;
}
return FALSE;
}
static void
queue_item_free (QueueItem *item)
{
g_list_foreach (item->queue->head, (GFunc)g_object_unref, NULL);
g_queue_free (item->queue);
g_free (item);
}
static QueueItem *
queue_item_new (gint32 msgno, gboolean reply)
{
QueueItem *item;
/* Add a new entry */
item = g_new (QueueItem, 1);
item->reply = FALSE;
item->msgno = msgno;
item->queue = g_queue_new ();
return item;
}
static void
out_queue_insert_rpy_slot (GSList **queue, gint32 msgno)
{
QueueItem *item;
g_return_if_fail (queue != NULL);
/* See if we already have an entry in the queue */
if (find_queue_item (*queue, msgno, TRUE)) {
rr_debug1 ("weird, already an reply slot in the queue");
return;
}
item = queue_item_new (msgno, TRUE);
*queue = g_slist_append (*queue, item);
}
static void
out_queue_push (GSList **queue, GObject *obj,
gint32 msgno, gint32 channel_id, RRFrameType type)
{
QueueItem *item;
g_return_if_fail (queue != NULL);
g_return_if_fail (obj != NULL);
if (type == RR_FRAME_TYPE_UNKNOWN) {
rr_debug1 ("channel::out_queue_push type == unknown, "
"this can't be right");
}
else if (type == RR_FRAME_TYPE_MSG) {
item = queue_item_new (msgno, FALSE);
*queue = g_slist_append (*queue, item);
}
else {
item = find_queue_item (*queue, msgno, FALSE);
if (item == NULL) {
if (!(channel_id == 0 && msgno == 0)) {
rr_debug1 ("channel::out_queue_push_message "
"enqueueing an unknown RPY, "
"this can't be right");
}
item = queue_item_new (msgno, TRUE);
*queue = g_slist_append (*queue, item);
}
}
g_queue_push_head (item->queue, obj);
out_queue_optimize (queue);
}
static GObject *
out_queue_peek_item (GSList *queue)
{
QueueItem *item;
GObject *object;
g_return_val_if_fail (queue != NULL, NULL);
item = queue->data;
g_assert (item != NULL);
object = g_queue_peek_tail (item->queue);
return object;
}
static GObject *
out_queue_pop (GSList **queue)
{
RRFrame *frame;
QueueItem *item;
GObject *object;
GSList *tmp;
g_return_val_if_fail (queue != NULL, NULL);
g_return_val_if_fail (*queue != NULL, NULL);
item = (*queue)->data;
g_assert (item != NULL);
object = g_queue_pop_tail (item->queue);
g_assert (object != NULL);
frame = (RRFrame *)object;
if (item->queue->length > 0 ||
(RR_IS_FRAME (frame) &&
(frame->more || frame->type == RR_FRAME_TYPE_ANS)))
return object;
/* Remove and free the first item */
tmp = *queue;
*queue = g_slist_remove_link (*queue, *queue);
queue_item_free (item);
g_slist_free_1 (tmp);
out_queue_optimize (queue);
return object;
}
static gboolean
out_queue_ready (GSList *queue)
{
QueueItem *item;
if (queue == NULL)
return FALSE;
item = queue->data;
g_assert (item != NULL);
return item->queue->length > 0;
}
static void
out_queue_optimize (GSList **queue)
{
QueueItem *item;
GSList *iter;
if (*queue == NULL) {
return;
}
item = (*queue)->data;
g_assert (item != NULL);
if (item->queue->length > 0) {
return;
}
iter = (*queue)->next;
while (iter) {
item = iter->data;
if (item->queue->length > 0) {
g_slist_remove_link (*queue, iter);
iter->next = *queue;
*queue = iter;
return;
}
iter = iter->next;
}
}
syntax highlighted by Code2HTML, v. 0.9.1