/*-2 2002/05/12 09:53:14 2002/07/15 20:07:379 2002/09/28 18:11:28 * $Id: rr-channel.c,v 1.43 2002/11/19 21:48:01 jonas Exp $ * * See the file LICENSE for redistribution information. * If you have not received a copy of the license, please contact CodeFactory * by email at info@codefactory.se, or on the web at http://www.codefactory.se/ * You may also write to: CodeFactory AB, SE-903 47, Umeå, Sweden. * * Copyright (c) 2002 Jonas Borgström * Copyright (c) 2002 Daniel Lundin * Copyright (c) 2002 CodeFactory AB. All rights reserved. */ #include #define RR_TCP_DEFAULT_WINDOW_SIZE 4192 /* FIXME: Shouldn't be here */ static GObjectClass *parent_class = NULL; static void finalize (GObject *object); typedef struct _QueueItem QueueItem; struct _QueueItem { gint32 msgno; gboolean reply; GQueue *queue; }; static void out_queue_insert_rpy_slot (GSList **queue, gint32 msgno); static void out_queue_free (GSList *queue); static void out_queue_push (GSList **queue, GObject *obj, gint32 msgno,gint32 channel_id, RRFrameType type); static GObject *out_queue_peek_item (GSList *queue); static GObject *out_queue_pop (GSList **queue); static gboolean out_queue_ready (GSList *queue); static void out_queue_optimize (GSList **queue); static void queue_item_free (QueueItem *item); static void rr_channel_init (GObject *object) { RRChannel *channel = (RRChannel *)object; channel->out_lock = g_mutex_new (); channel->out_cond = g_cond_new (); channel->out_queue = NULL; channel->window_size = RR_TCP_DEFAULT_WINDOW_SIZE; channel->window_in = RR_TCP_DEFAULT_WINDOW_SIZE; channel->window_out = RR_TCP_DEFAULT_WINDOW_SIZE; channel->mutex = g_mutex_new (); channel->aggregate_frames = TRUE; } static void rr_channel_class_init (GObjectClass *klass) { klass->finalize = finalize; parent_class = g_type_class_peek_parent (klass); } GType rr_channel_get_type (void) { static GType rr_type = 0; if (!rr_type) { static GTypeInfo type_info = { sizeof (RRChannelClass), NULL, NULL, (GClassInitFunc) rr_channel_class_init, NULL, NULL, sizeof (RRChannel), 16, (GInstanceInitFunc) rr_channel_init }; rr_type = g_type_register_static (G_TYPE_OBJECT, "RRChannel", &type_info, G_TYPE_FLAG_ABSTRACT); } return rr_type; } static void finalize (GObject *object) { RRChannel *channel = (RRChannel *)object; rr_main_work_pool_join (RRWPGROUP (object)); /* Outgoing msg queue */ g_mutex_lock (channel->out_lock); out_queue_free (channel->out_queue); g_mutex_unlock (channel->out_lock); g_mutex_free (channel->out_lock); g_cond_free (channel->out_cond); g_mutex_free (channel->mutex); parent_class->finalize (object); } void rr_channel_set_connection (RRChannel *channel, RRConnection *connection) { g_return_if_fail (channel != NULL); g_return_if_fail (RR_IS_CHANNEL (channel)); channel->connection = connection; } RRConnection * rr_channel_get_connection (RRChannel *channel) { g_return_val_if_fail (channel != NULL, NULL); g_return_val_if_fail (RR_IS_CHANNEL (channel), NULL); return channel->connection; } /** * rr_channel_out_queue_empty: * @channel: A #RRChannel * * Checks if the channels has anything (frame or message) to send. * * Return value: TRUE if the out queue is empty, else FALSE. **/ gboolean rr_channel_out_queue_empty (RRChannel *channel) { gboolean empty; g_mutex_lock (channel->out_lock); empty = !out_queue_ready (channel->out_queue); g_mutex_unlock (channel->out_lock); return empty; } GObject * rr_channel_get_active_item (RRChannel *channel) { GObject *item; g_mutex_lock (channel->out_lock); item = out_queue_peek_item (channel->out_queue); g_mutex_unlock (channel->out_lock); return item; } gboolean rr_channel_remove_active_item (RRChannel *channel) { GObject *item; gboolean empty; g_mutex_lock (channel->out_lock); item = out_queue_pop (&channel->out_queue); empty = !out_queue_ready (channel->out_queue); g_cond_broadcast (channel->out_cond); g_mutex_unlock (channel->out_lock); return empty; } static gboolean send_helper (RRChannel *channel, GObject *object, GError **error) { if (channel->connection == NULL) { g_set_error (error, RR_ERROR, /* error domain */ RR_ERROR_DISCONNECTED,/* error code */ "Channel not associated with any connection."); return FALSE; } if (RR_IS_MESSAGE (object)) rr_message_set_channel (RR_MESSAGE (object), channel); else RR_FRAME (object)->channel_id = channel->id; g_mutex_lock (channel->out_lock); if (RR_IS_MESSAGE (object)) { RRMessage *message = RR_MESSAGE (object); if (message->msgno < 0) { if (message->type == RR_FRAME_TYPE_MSG) { message->msgno = channel->msgno++; if (channel->msgno < 0) channel->msgno = 0; } /* else { */ /* g_set_error (error, */ /* RR_ERROR, RR_BEEP_CODE_NO_ACTION, */ /* "msg->msgno has to be set on " */ /* "message replies."); */ /* return FALSE; */ /* } */ } out_queue_push (&channel->out_queue, object, message->msgno, channel->id, message->type); } else { RRFrame *frame = RR_FRAME (object); if (frame->msgno < 0) { frame->msgno = frame->msgno++; if (channel->msgno < 0) channel->msgno = 0; } out_queue_push (&channel->out_queue, object, frame->msgno, channel->id, frame->type); } g_mutex_unlock (channel->out_lock); rr_connection_register_sender (channel->connection, channel); return TRUE; } /** * rr_channel_send_message: * @channel: a #RRChannel * @message: a #RRMessage * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR. * * Enqueue @message for transmission on @channel. * Note: This function don't block until the message is sent. And @message * will be unref:ed after transmission. * * Return value: %TRUE on success, %FALSE on failure. **/ gboolean rr_channel_send_message (RRChannel *channel, RRMessage *message, GError **error) { g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE); g_return_val_if_fail (RR_IS_MESSAGE (message), FALSE); return send_helper (channel, G_OBJECT (message), error); } /** * rr_channel_send_frame: * @channel: a #RRChannel * @frame: a #RRFrame * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR. * * Enqueue @frame for transmission on @channel. * Note: This function don't block until the frame is sent. And @frame * will be unref:ed after transmission. * * Return value: %TRUE on success, %FALSE on failure. **/ gboolean rr_channel_send_frame (RRChannel *channel, RRFrame *frame, GError **error) { g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE); g_return_val_if_fail (RR_IS_FRAME (frame), FALSE); return send_helper (channel, G_OBJECT (frame), error); } void rr_channel_close_confirmation (RRChannel *channel, gint code, const gchar *xml_lang, const gchar *diagnostic) { rr_debug2 ("channel::close_confirmation " "%s id=%d code=%d diag='%s'", G_OBJECT_TYPE_NAME (G_OBJECT (channel)), channel->id, code, diagnostic ? diagnostic : ""); if (RR_CHANNEL_GET_CLASS(channel)->close_confirmation) RR_CHANNEL_GET_CLASS(channel)->close_confirmation (channel, code, xml_lang, diagnostic); } gboolean rr_channel_close_indication (RRChannel *channel, gint code, const gchar *xml_lang, const gchar *diagnostic, GError **error) { rr_debug2 ("channel::close_indication " "%s id=%d code=%d diag='%s'", G_OBJECT_TYPE_NAME (G_OBJECT (channel)), channel->id, code, diagnostic ? diagnostic : ""); if (RR_CHANNEL_GET_CLASS(channel)->close_indication) return RR_CHANNEL_GET_CLASS(channel)->close_indication (channel, code, xml_lang, diagnostic, error); else return TRUE; } static void do_frame_available (gpointer data, gpointer user_data) { RRChannel *channel = data; RRFrame *frame = user_data; GError *error = NULL; gboolean ret; rr_debug2 ("channel::frame_available %s %s %d %d %s %d %d", G_OBJECT_TYPE_NAME (G_OBJECT (channel)), (frame->type == RR_FRAME_TYPE_MSG ? "MSG" : (frame->type == RR_FRAME_TYPE_RPY ? "RPY" : (frame->type == RR_FRAME_TYPE_ERR ? "ERR" : (frame->type == RR_FRAME_TYPE_ANS ? "ANS" : (frame->type == RR_FRAME_TYPE_NUL ? "NUL" : "???"))))), channel->id, frame->msgno, (frame->more?"*":"."), frame->seqno, frame->size); if (RR_CHANNEL_GET_CLASS(channel)->frame_available == NULL) { g_warning ("missing frame_available handler"); g_object_unref (G_OBJECT (frame)); return; } if (channel->aggregate_frames) { RRFrame *new_frame; new_frame = rr_frame_aggregate (&channel->frame_list, frame); if (new_frame) { g_object_unref (G_OBJECT (frame)); frame = new_frame; } else { g_object_unref (G_OBJECT (frame)); return; } } /* Reserv a place in the queue, so we are sure the replies get sent * in the correct order */ if (frame->type == RR_FRAME_TYPE_MSG) { g_mutex_lock (channel->out_lock); out_queue_insert_rpy_slot (&channel->out_queue, frame->msgno); g_mutex_unlock (channel->out_lock); } ret = RR_CHANNEL_GET_CLASS(channel)->frame_available (channel, frame, &error); if (ret == FALSE) { rr_debug1 ("channel::frame_available failed: %s\n", error ? error->message : ""); if (error && frame->type == RR_FRAME_TYPE_MSG) { RRMessageError *err; /* FIXME: we should only send an error frame if an reply for the given frame haven't been sent */ err = rr_message_error_new_from_gerror (error, NULL); RR_MESSAGE (err)->msgno = frame->msgno; rr_channel_send_message (channel, RR_MESSAGE (err), NULL); } if (error) g_error_free (error); } g_object_unref (G_OBJECT (frame)); } void rr_channel_frame_available (RRChannel *channel, RRFrame *frame) { g_object_ref (G_OBJECT (frame)); if (channel->dont_queue_frames) do_frame_available (channel, frame); else rr_main_work_pool_push (RRWPGROUP (channel), do_frame_available, channel, frame); } gboolean rr_channel_client_init (RRChannel *channel, GError **error) { rr_debug1 ("channel::client_init " "%s id=%d", G_OBJECT_TYPE_NAME (G_OBJECT (channel)), channel->id); if (RR_CHANNEL_GET_CLASS (channel)->client_init) return RR_CHANNEL_GET_CLASS (channel)->client_init (channel, error); else return TRUE; } gboolean rr_channel_server_init (RRChannel *channel, const gchar *piggyback, GError **error) { rr_debug1 ("channel::server_init " "%s id=%d piggyback=%s", G_OBJECT_TYPE_NAME (G_OBJECT (channel)), channel->id, piggyback ? piggyback : "None"); if (RR_CHANNEL_GET_CLASS (channel)->server_init) return RR_CHANNEL_GET_CLASS (channel)->server_init (channel, piggyback, error); else return TRUE; } void rr_channel_client_confirmation (RRChannel *channel, const gchar *piggyback) { rr_debug1 ("channel::client_confirmation " "%s id=%d piggyback=%s", G_OBJECT_TYPE_NAME (G_OBJECT (channel)), channel->id, piggyback ? piggyback : "None"); if (RR_CHANNEL_GET_CLASS (channel)->client_confirmation) RR_CHANNEL_GET_CLASS (channel)->client_confirmation (channel, piggyback); } void rr_channel_server_confirmation (RRChannel *channel) { rr_debug1 ("channel::server_confirmation " "%s id=%d", G_OBJECT_TYPE_NAME (G_OBJECT (channel)), channel->id); if (RR_CHANNEL_GET_CLASS (channel)->server_confirmation) RR_CHANNEL_GET_CLASS (channel)->server_confirmation (channel); } void rr_channel_register_frame (RRChannel *channel, RRFrame *frame) { g_return_if_fail (RR_IS_CHANNEL (channel)); g_return_if_fail (RR_IS_FRAME (frame)); frame->seqno = channel->seq_out; channel->seq_out += frame->size; channel->window_out -= frame->size; g_return_if_fail (channel->window_in >= 0); } /** * rr_channel_flush: * @channel: A #RRChannel * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR. * * Blocks until all outgoing frames/messages are sent. * * Return value: %TRUE on success, %FALSE on failure. **/ gboolean rr_channel_flush (RRChannel *channel, GError **error) { g_mutex_lock (channel->out_lock); /* FIXME: Add disconnect () detection */ /* while (channel->out_queue->length != 0) */ /* g_cond_wait (channel->out_cond, channel->out_lock); */ while (out_queue_ready (channel->out_queue)) g_cond_wait (channel->out_cond, channel->out_lock); g_mutex_unlock (channel->out_lock); return TRUE; } /** * rr_channel_set_window_size: * @channel: A #RRChannel. * @size: The new size of the receive window for @channel. * * Sets the size of the receive window. **/ void rr_channel_set_window_size (RRChannel *channel, gint size) { g_return_if_fail (RR_IS_CHANNEL (channel)); g_return_if_fail (size >= 0); channel->window_size = size; } /** * rr_channel_get_window_size: * @channel: A #RRChannel. * * Returns the receive window size. * * Return value: the window size. **/ gint rr_channel_get_window_size (RRChannel *channel) { g_return_val_if_fail (RR_IS_CHANNEL (channel), -1); return channel->window_size; } /** * rr_channel_set_aggregate: * @channel: a #RRChannel * @aggregate: %TRUE or %FALSE. * * Selects if the frames should be aggregated or not. **/ void rr_channel_set_aggregate (RRChannel *channel, gboolean aggregate) { g_return_if_fail (RR_IS_CHANNEL (channel)); channel->aggregate_frames = aggregate; } /** * rr_channel_get_aggregate: * @channel: a #RRChannel * * Returns %TRUE if the frames are aggregated or %FALSE if they are * passed to #rr_channel_frame_available directly. * * Return value: %TRUE or %FALSE. **/ gboolean rr_channel_get_aggregate (RRChannel *channel) { g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE); return channel->aggregate_frames; } /** * rr_channel_get_uri: * @type: A GType * * Get the URI the profile is associated with. * * Return value: A uri or %NULL **/ const gchar * rr_channel_get_uri (GType type) { return g_type_get_qdata (type, g_quark_from_string (RR_CHANNEL_GTYPE_KEY_URI)); } /** * rr_channel_set_uri: * @type: A GType * @uri: An URI string. * * Associate an URI with the profile. **/ void rr_channel_set_uri (GType type, const gchar *uri) { gchar *old; g_assert (g_type_is_a (type, RR_TYPE_CHANNEL)); old = g_type_get_qdata (type, g_quark_from_string (RR_CHANNEL_GTYPE_KEY_URI)); if (old) g_free (old); g_type_set_qdata (type, g_quark_from_string (RR_CHANNEL_GTYPE_KEY_URI), g_strdup (uri)); } const gchar * rr_channel_get_piggyback (RRChannel *channel) { g_return_val_if_fail (RR_IS_CHANNEL (channel), NULL); return channel->piggyback; } void rr_channel_set_piggyback (RRChannel *channel, const gchar *piggyback) { g_return_if_fail (RR_IS_CHANNEL (channel)); if (channel->piggyback) g_free (channel->piggyback); if (piggyback) channel->piggyback = g_strdup (piggyback); else channel->piggyback = NULL; } gboolean rr_channel_close (RRChannel *channel, gint code, const gchar *xml_lang, const gchar *diagnostic, GError **error) { RRConnection *conn; RRManager *manager; conn = rr_channel_get_connection (channel); g_return_val_if_fail (conn, FALSE); manager = rr_connection_get_manager (conn); g_return_val_if_fail (manager, FALSE); return rr_manager_close_channel (manager, channel, code, xml_lang, diagnostic, error); } void rr_channel_lock (RRChannel *channel) { g_return_if_fail (RR_IS_CHANNEL (channel)); g_mutex_lock (channel->mutex); } void rr_channel_unlock (RRChannel *channel) { g_return_if_fail (RR_IS_CHANNEL (channel)); g_mutex_unlock (channel->mutex); } static void out_queue_free (GSList *queue) { g_slist_foreach (queue, (GFunc)queue_item_free, NULL); g_slist_free (queue); } static QueueItem * find_queue_item (GSList *queue, gint32 msgno, gboolean reply) { GSList *iter; QueueItem *item; /* See if we already have an entry in the queue */ iter = queue; while (iter) { item = iter->data; if (item->reply == reply && item->msgno == msgno) { return item; } iter = iter->next; } return FALSE; } static void queue_item_free (QueueItem *item) { g_list_foreach (item->queue->head, (GFunc)g_object_unref, NULL); g_queue_free (item->queue); g_free (item); } static QueueItem * queue_item_new (gint32 msgno, gboolean reply) { QueueItem *item; /* Add a new entry */ item = g_new (QueueItem, 1); item->reply = FALSE; item->msgno = msgno; item->queue = g_queue_new (); return item; } static void out_queue_insert_rpy_slot (GSList **queue, gint32 msgno) { QueueItem *item; g_return_if_fail (queue != NULL); /* See if we already have an entry in the queue */ if (find_queue_item (*queue, msgno, TRUE)) { rr_debug1 ("weird, already an reply slot in the queue"); return; } item = queue_item_new (msgno, TRUE); *queue = g_slist_append (*queue, item); } static void out_queue_push (GSList **queue, GObject *obj, gint32 msgno, gint32 channel_id, RRFrameType type) { QueueItem *item; g_return_if_fail (queue != NULL); g_return_if_fail (obj != NULL); if (type == RR_FRAME_TYPE_UNKNOWN) { rr_debug1 ("channel::out_queue_push type == unknown, " "this can't be right"); } else if (type == RR_FRAME_TYPE_MSG) { item = queue_item_new (msgno, FALSE); *queue = g_slist_append (*queue, item); } else { item = find_queue_item (*queue, msgno, FALSE); if (item == NULL) { if (!(channel_id == 0 && msgno == 0)) { rr_debug1 ("channel::out_queue_push_message " "enqueueing an unknown RPY, " "this can't be right"); } item = queue_item_new (msgno, TRUE); *queue = g_slist_append (*queue, item); } } g_queue_push_head (item->queue, obj); out_queue_optimize (queue); } static GObject * out_queue_peek_item (GSList *queue) { QueueItem *item; GObject *object; g_return_val_if_fail (queue != NULL, NULL); item = queue->data; g_assert (item != NULL); object = g_queue_peek_tail (item->queue); return object; } static GObject * out_queue_pop (GSList **queue) { RRFrame *frame; QueueItem *item; GObject *object; GSList *tmp; g_return_val_if_fail (queue != NULL, NULL); g_return_val_if_fail (*queue != NULL, NULL); item = (*queue)->data; g_assert (item != NULL); object = g_queue_pop_tail (item->queue); g_assert (object != NULL); frame = (RRFrame *)object; if (item->queue->length > 0 || (RR_IS_FRAME (frame) && (frame->more || frame->type == RR_FRAME_TYPE_ANS))) return object; /* Remove and free the first item */ tmp = *queue; *queue = g_slist_remove_link (*queue, *queue); queue_item_free (item); g_slist_free_1 (tmp); out_queue_optimize (queue); return object; } static gboolean out_queue_ready (GSList *queue) { QueueItem *item; if (queue == NULL) return FALSE; item = queue->data; g_assert (item != NULL); return item->queue->length > 0; } static void out_queue_optimize (GSList **queue) { QueueItem *item; GSList *iter; if (*queue == NULL) { return; } item = (*queue)->data; g_assert (item != NULL); if (item->queue->length > 0) { return; } iter = (*queue)->next; while (iter) { item = iter->data; if (item->queue->length > 0) { g_slist_remove_link (*queue, iter); iter->next = *queue; *queue = iter; return; } iter = iter->next; } }