/*-2 2002/05/12 09:53:14 2002/07/15 20:07:379 2002/09/28 18:11:28
 * $Id: rr-channel.c,v 1.43 2002/11/19 21:48:01 jonas Exp $
 *
 * See the file LICENSE for redistribution information. 
 * If you have not received a copy of the license, please contact CodeFactory
 * by email at info@codefactory.se, or on the web at http://www.codefactory.se/
 * You may also write to: CodeFactory AB, SE-903 47, Umeå, Sweden.
 *
 * Copyright (c) 2002 Jonas Borgström <jonas@codefactory.se>
 * Copyright (c) 2002 Daniel Lundin   <daniel@codefactory.se>
 * Copyright (c) 2002 CodeFactory AB.  All rights reserved.
 */

#include <librr/rr.h>

#define RR_TCP_DEFAULT_WINDOW_SIZE 4192 /* FIXME: Shouldn't be here */

static GObjectClass *parent_class = NULL;

static void finalize (GObject *object);

typedef struct _QueueItem QueueItem;
struct _QueueItem {
	gint32   msgno;
	gboolean reply;
	GQueue  *queue;
};

static void     out_queue_insert_rpy_slot (GSList **queue, gint32 msgno);
static void     out_queue_free            (GSList *queue);
static void     out_queue_push            (GSList **queue, 
					   GObject *obj, 
					   gint32 msgno,gint32 channel_id, 
					   RRFrameType type);
static GObject *out_queue_peek_item       (GSList *queue);
static GObject *out_queue_pop             (GSList **queue);
static gboolean out_queue_ready           (GSList *queue);
static void     out_queue_optimize        (GSList **queue);
static void     queue_item_free           (QueueItem *item);

static void
rr_channel_init (GObject *object)
{
	RRChannel *channel = (RRChannel *)object;

	channel->out_lock = g_mutex_new ();
	channel->out_cond = g_cond_new ();
	channel->out_queue = NULL;

	channel->window_size = RR_TCP_DEFAULT_WINDOW_SIZE;
	channel->window_in  = RR_TCP_DEFAULT_WINDOW_SIZE;
	channel->window_out = RR_TCP_DEFAULT_WINDOW_SIZE;

	channel->mutex = g_mutex_new ();

	channel->aggregate_frames = TRUE;
}

static void
rr_channel_class_init (GObjectClass *klass)
{
	klass->finalize = finalize;

	parent_class = g_type_class_peek_parent (klass);
}

GType 
rr_channel_get_type (void)
{
	static GType rr_type = 0;

	if (!rr_type) {
		static GTypeInfo type_info = {
			sizeof (RRChannelClass),
			NULL,
			NULL,
			(GClassInitFunc) rr_channel_class_init,
			NULL,
			NULL,
			sizeof (RRChannel),
			16,
			(GInstanceInitFunc) rr_channel_init
		};
		rr_type = g_type_register_static (G_TYPE_OBJECT, "RRChannel", 
						  &type_info, G_TYPE_FLAG_ABSTRACT);
	}
	return rr_type;
}

static void
finalize (GObject *object)
{
	RRChannel *channel = (RRChannel *)object;

	rr_main_work_pool_join (RRWPGROUP (object));

	/* Outgoing msg queue */
	g_mutex_lock (channel->out_lock);
	out_queue_free (channel->out_queue);
	g_mutex_unlock (channel->out_lock);
	g_mutex_free (channel->out_lock);
	g_cond_free (channel->out_cond);

	g_mutex_free (channel->mutex);

	parent_class->finalize (object);
}

void
rr_channel_set_connection (RRChannel *channel, RRConnection *connection)
{
	g_return_if_fail (channel != NULL);
	g_return_if_fail (RR_IS_CHANNEL (channel));

	channel->connection = connection;
}

RRConnection *
rr_channel_get_connection (RRChannel *channel)
{
	g_return_val_if_fail (channel != NULL, NULL);
	g_return_val_if_fail (RR_IS_CHANNEL (channel), NULL);

	return channel->connection;
}

/**
 * rr_channel_out_queue_empty:
 * @channel: A #RRChannel
 * 
 * Checks if the channels has anything (frame or message) to send.
 * 
 * Return value: TRUE if the out queue is empty, else FALSE.
 **/
gboolean
rr_channel_out_queue_empty (RRChannel *channel)
{
	gboolean empty;

	g_mutex_lock (channel->out_lock);

	empty = !out_queue_ready (channel->out_queue);

	g_mutex_unlock (channel->out_lock);

	return empty;
}

GObject *
rr_channel_get_active_item (RRChannel *channel)
{
	GObject *item;

	g_mutex_lock (channel->out_lock);
	item = out_queue_peek_item (channel->out_queue);
	g_mutex_unlock (channel->out_lock);

	return item;
}

gboolean
rr_channel_remove_active_item (RRChannel *channel)
{
	GObject *item;
	gboolean empty;

	g_mutex_lock (channel->out_lock);

	item = out_queue_pop (&channel->out_queue);
	empty = !out_queue_ready (channel->out_queue);

	g_cond_broadcast (channel->out_cond);
	g_mutex_unlock (channel->out_lock);

	return empty;
}

static gboolean
send_helper (RRChannel *channel, GObject *object, GError **error)
{
	if (channel->connection == NULL) {
		
		g_set_error (error, 
			     RR_ERROR, /* error domain */
			     RR_ERROR_DISCONNECTED,/* error code */
			     "Channel not associated with any connection.");
		
		return FALSE;
	}
	if (RR_IS_MESSAGE (object))
		rr_message_set_channel (RR_MESSAGE (object), channel);
	else
		RR_FRAME (object)->channel_id = channel->id;
	
	g_mutex_lock (channel->out_lock);
	if (RR_IS_MESSAGE (object)) {
		RRMessage *message = RR_MESSAGE (object);

		if (message->msgno < 0) {
			if (message->type == RR_FRAME_TYPE_MSG) {
				message->msgno = channel->msgno++;
				if (channel->msgno < 0)
					channel->msgno = 0;
			}
/* 			else { */
/* 				g_set_error (error,  */
/* 					     RR_ERROR, RR_BEEP_CODE_NO_ACTION, */
/* 					     "msg->msgno has to be set on " */
/* 					     "message replies."); */
/* 				return FALSE; */
/* 			} */
		}
		out_queue_push (&channel->out_queue, object, 
				message->msgno, channel->id,
				message->type);
	}
	else {
		RRFrame *frame = RR_FRAME (object);
		
		if (frame->msgno < 0) {
			frame->msgno = frame->msgno++;
			if (channel->msgno < 0)
				channel->msgno = 0;
		}
		out_queue_push (&channel->out_queue, object, 
				frame->msgno, channel->id,
				frame->type);
	}
	g_mutex_unlock (channel->out_lock);

	rr_connection_register_sender (channel->connection, channel);

	return TRUE;
}

/**
 * rr_channel_send_message:
 * @channel: a #RRChannel
 * @message: a #RRMessage
 * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR.
 * 
 * Enqueue @message for transmission on @channel. 
 * Note: This function don't block until the message is sent. And @message
 * will be unref:ed after transmission.
 * 
 * Return value: %TRUE on success, %FALSE on failure.
 **/
gboolean
rr_channel_send_message (RRChannel *channel, RRMessage *message, 
			 GError **error)
{
	g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE);
	g_return_val_if_fail (RR_IS_MESSAGE (message), FALSE);
	return send_helper (channel, G_OBJECT (message), error);
}

/**
 * rr_channel_send_frame:
 * @channel: a #RRChannel
 * @frame: a #RRFrame
 * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR.
 * 
 * Enqueue @frame for transmission on @channel. 
 * Note: This function don't block until the frame is sent. And @frame
 * will be unref:ed after transmission.
 * 
 * Return value: %TRUE on success, %FALSE on failure.
 **/
gboolean
rr_channel_send_frame (RRChannel *channel, RRFrame *frame, 
		       GError **error)
{
	g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE);
	g_return_val_if_fail (RR_IS_FRAME (frame), FALSE);
	return send_helper (channel, G_OBJECT (frame), error);
}

void
rr_channel_close_confirmation (RRChannel *channel, gint code,
			       const gchar *xml_lang, const gchar *diagnostic)
{
	rr_debug2 ("channel::close_confirmation "
		   "%s id=%d code=%d diag='%s'", 
		   G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
		   channel->id, code,
		   diagnostic ? diagnostic : "");

	if (RR_CHANNEL_GET_CLASS(channel)->close_confirmation)
		RR_CHANNEL_GET_CLASS(channel)->close_confirmation (channel, 
								   code, 
								   xml_lang, 
								   diagnostic);
}

gboolean
rr_channel_close_indication (RRChannel *channel, gint code,
			     const gchar *xml_lang, const gchar *diagnostic,
			     GError **error)
{
	rr_debug2 ("channel::close_indication "
		   "%s id=%d code=%d diag='%s'", 
		   G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
		   channel->id, code,
		   diagnostic ? diagnostic : "");
	
	if (RR_CHANNEL_GET_CLASS(channel)->close_indication)
		return RR_CHANNEL_GET_CLASS(channel)->close_indication (channel, 
									code, 
									xml_lang, 
									diagnostic, 
									error);
	else
		return TRUE;
}

static void
do_frame_available (gpointer data, gpointer user_data)
{
	RRChannel *channel = data;
	RRFrame *frame = user_data;
	GError *error = NULL;
	gboolean ret;
	
	rr_debug2 ("channel::frame_available %s %s %d %d %s %d %d",
		   G_OBJECT_TYPE_NAME (G_OBJECT (channel)), 
		   (frame->type == RR_FRAME_TYPE_MSG ? "MSG" :
		    (frame->type == RR_FRAME_TYPE_RPY ? "RPY" :
		     (frame->type == RR_FRAME_TYPE_ERR ? "ERR" :
		      (frame->type == RR_FRAME_TYPE_ANS ? "ANS" :
		       (frame->type == RR_FRAME_TYPE_NUL ? "NUL" :
			"???"))))),
		   channel->id, frame->msgno, (frame->more?"*":"."),
		   frame->seqno, frame->size);

	if (RR_CHANNEL_GET_CLASS(channel)->frame_available == NULL) {

		g_warning ("missing frame_available handler");
		g_object_unref (G_OBJECT (frame));
		return;
	}

	if (channel->aggregate_frames) {
		RRFrame *new_frame;

		new_frame = rr_frame_aggregate (&channel->frame_list, frame);

		if (new_frame) {

			g_object_unref (G_OBJECT (frame));
			frame = new_frame;
		}
		else {
			g_object_unref (G_OBJECT (frame));
			return;
		}
	}
	/* Reserv a place in the queue, so we are sure the replies get sent
	 * in the correct order */
	if (frame->type == RR_FRAME_TYPE_MSG) {

		g_mutex_lock (channel->out_lock);
		out_queue_insert_rpy_slot (&channel->out_queue, frame->msgno);
		g_mutex_unlock (channel->out_lock);

	}

	ret = RR_CHANNEL_GET_CLASS(channel)->frame_available (channel, frame,
							      &error);

	if (ret == FALSE) {

		rr_debug1 ("channel::frame_available failed: %s\n", 
			   error ? error->message : "");

		if (error && frame->type == RR_FRAME_TYPE_MSG) {
			RRMessageError *err;
			
			/* FIXME: we should only send an error frame if an reply
			   for the given frame haven't been sent */
			err = rr_message_error_new_from_gerror (error, NULL);
			RR_MESSAGE (err)->msgno = frame->msgno;
			rr_channel_send_message (channel, RR_MESSAGE (err), NULL);
		}
		if (error)
			g_error_free (error);
	}
	g_object_unref (G_OBJECT (frame));
}

void
rr_channel_frame_available (RRChannel *channel, RRFrame *frame)
{
	g_object_ref (G_OBJECT (frame));

	if (channel->dont_queue_frames)
		do_frame_available (channel, frame);
	else
		rr_main_work_pool_push (RRWPGROUP (channel), 
					do_frame_available, channel, frame);
}

gboolean
rr_channel_client_init (RRChannel *channel, GError **error)
{
	rr_debug1 ("channel::client_init "
		   "%s id=%d", G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
		   channel->id);

	if (RR_CHANNEL_GET_CLASS (channel)->client_init)
		return RR_CHANNEL_GET_CLASS (channel)->client_init (channel, 
								    error);
	else
		return TRUE;
}

gboolean
rr_channel_server_init (RRChannel *channel, const gchar *piggyback,
			GError **error)
{
	rr_debug1 ("channel::server_init "
		   "%s id=%d piggyback=%s", 
		   G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
		   channel->id,
		   piggyback ? piggyback : "None");

	if (RR_CHANNEL_GET_CLASS (channel)->server_init)
		return RR_CHANNEL_GET_CLASS (channel)->server_init (channel, 
								    piggyback,
								    error);
	else
		return TRUE;
}

void
rr_channel_client_confirmation (RRChannel *channel, const gchar *piggyback)
{
	rr_debug1 ("channel::client_confirmation "
		   "%s id=%d piggyback=%s", G_OBJECT_TYPE_NAME (G_OBJECT (channel)), 
		   channel->id,
		   piggyback ? piggyback : "None");

	if (RR_CHANNEL_GET_CLASS (channel)->client_confirmation)
		RR_CHANNEL_GET_CLASS (channel)->client_confirmation (channel,
								     piggyback);
}

void
rr_channel_server_confirmation (RRChannel *channel)
{
	rr_debug1 ("channel::server_confirmation "
		   "%s id=%d", G_OBJECT_TYPE_NAME (G_OBJECT (channel)), 
		   channel->id);

	if (RR_CHANNEL_GET_CLASS (channel)->server_confirmation)
		RR_CHANNEL_GET_CLASS (channel)->server_confirmation (channel);
}

void
rr_channel_register_frame (RRChannel *channel, RRFrame *frame)
{
	g_return_if_fail (RR_IS_CHANNEL (channel));
	g_return_if_fail (RR_IS_FRAME (frame));
	
	frame->seqno         = channel->seq_out;
	channel->seq_out    += frame->size;
	channel->window_out -= frame->size;

	g_return_if_fail (channel->window_in >= 0);
}

/**
 * rr_channel_flush:
 * @channel: A #RRChannel
 * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR.
 * 
 * Blocks until all outgoing frames/messages are sent.
 * 
 * Return value: %TRUE on success, %FALSE on failure.
 **/
gboolean
rr_channel_flush (RRChannel *channel, GError **error)
{
	g_mutex_lock (channel->out_lock);
	/* FIXME: Add disconnect () detection */
/* 	while (channel->out_queue->length != 0) */
/* 		g_cond_wait (channel->out_cond, channel->out_lock); */
	while (out_queue_ready (channel->out_queue))
		g_cond_wait (channel->out_cond, channel->out_lock);

	g_mutex_unlock (channel->out_lock);

	return TRUE;
}

/**
 * rr_channel_set_window_size:
 * @channel: A #RRChannel.
 * @size: The new size of the receive window for @channel.
 * 
 * Sets the size of the receive window.
 **/
void
rr_channel_set_window_size (RRChannel *channel, gint size)
{
	g_return_if_fail (RR_IS_CHANNEL (channel));
	g_return_if_fail (size >= 0);

	channel->window_size = size;
}

/**
 * rr_channel_get_window_size:
 * @channel: A #RRChannel.
 * 
 * Returns the receive window size.
 * 
 * Return value: the window size.
 **/
gint
rr_channel_get_window_size (RRChannel *channel)
{
	g_return_val_if_fail (RR_IS_CHANNEL (channel), -1);

	return channel->window_size;
}

/**
 * rr_channel_set_aggregate:
 * @channel: a #RRChannel
 * @aggregate: %TRUE or %FALSE.
 * 
 * Selects if the frames should be aggregated or not.
 **/
void
rr_channel_set_aggregate (RRChannel *channel, gboolean aggregate)
{
	g_return_if_fail (RR_IS_CHANNEL (channel));

	channel->aggregate_frames = aggregate;
}

/**
 * rr_channel_get_aggregate:
 * @channel: a #RRChannel
 * 
 * Returns %TRUE if the frames are aggregated or %FALSE if they are
 * passed to #rr_channel_frame_available directly.
 * 
 * Return value: %TRUE or %FALSE. 
 **/
gboolean
rr_channel_get_aggregate (RRChannel *channel)
{
	g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE);

	return channel->aggregate_frames;
}

/**
 * rr_channel_get_uri:
 * @type: A GType
 * 
 * Get the URI the profile is associated with.
 * 
 * Return value: A uri or %NULL
 **/
const gchar *
rr_channel_get_uri (GType type)
{
	return g_type_get_qdata (type, g_quark_from_string (RR_CHANNEL_GTYPE_KEY_URI));
}

/**
 * rr_channel_set_uri:
 * @type: A GType
 * @uri: An URI string.
 * 
 * Associate an URI with the profile.
 **/
void
rr_channel_set_uri (GType type, const gchar *uri)
{
	gchar *old;
	g_assert (g_type_is_a (type, RR_TYPE_CHANNEL));

	old = g_type_get_qdata (type, 
				g_quark_from_string (RR_CHANNEL_GTYPE_KEY_URI));
	if (old)
		g_free (old);

	g_type_set_qdata (type, 
			  g_quark_from_string (RR_CHANNEL_GTYPE_KEY_URI), 
			  g_strdup (uri));
}

const gchar *
rr_channel_get_piggyback (RRChannel *channel)
{
	g_return_val_if_fail (RR_IS_CHANNEL (channel), NULL);

	return channel->piggyback;
}

void
rr_channel_set_piggyback (RRChannel *channel, const gchar *piggyback)
{
	g_return_if_fail (RR_IS_CHANNEL (channel));

	if (channel->piggyback)
		g_free (channel->piggyback);
	if (piggyback)
		channel->piggyback = g_strdup (piggyback);
	else
		channel->piggyback = NULL;
}

gboolean
rr_channel_close (RRChannel *channel,
		  gint code, const gchar *xml_lang, 
		  const gchar *diagnostic,
		  GError **error)
{
	RRConnection *conn;
	RRManager *manager;

	conn = rr_channel_get_connection (channel);
	g_return_val_if_fail (conn, FALSE);
	manager = rr_connection_get_manager (conn);
	g_return_val_if_fail (manager, FALSE);

	return rr_manager_close_channel (manager, channel, code, xml_lang,
					 diagnostic, error);
}

void
rr_channel_lock (RRChannel *channel)
{
	g_return_if_fail (RR_IS_CHANNEL (channel));
	g_mutex_lock (channel->mutex);
}

void
rr_channel_unlock (RRChannel *channel)
{
	g_return_if_fail (RR_IS_CHANNEL (channel));
	g_mutex_unlock (channel->mutex);
}

static void 
out_queue_free (GSList *queue)
{
	g_slist_foreach (queue, (GFunc)queue_item_free, NULL);
	g_slist_free (queue);
}

static QueueItem *
find_queue_item (GSList *queue, gint32 msgno, gboolean reply)
{
	GSList *iter;
	QueueItem *item;

	/* See if we already have an entry in the queue */
	iter = queue;
	while (iter) {
		item = iter->data;
		if (item->reply == reply && item->msgno == msgno) {
			return item;
		}
		iter = iter->next;
	}
	return FALSE;
}

static void
queue_item_free (QueueItem *item)
{
	g_list_foreach (item->queue->head, (GFunc)g_object_unref, NULL);
	g_queue_free (item->queue);
	g_free (item);
}

static QueueItem *
queue_item_new (gint32 msgno, gboolean reply)
{
	QueueItem *item;

	/* Add a new entry */
	item = g_new (QueueItem, 1);
	item->reply = FALSE;
	item->msgno = msgno;
	item->queue = g_queue_new ();
	return item;
}

static void
out_queue_insert_rpy_slot (GSList **queue, gint32 msgno)
{
	QueueItem *item;

	g_return_if_fail (queue != NULL);

	/* See if we already have an entry in the queue */
	if (find_queue_item (*queue, msgno, TRUE)) {

		rr_debug1 ("weird, already an reply slot in the queue");
		return;
	}
	item = queue_item_new (msgno, TRUE);

	*queue = g_slist_append (*queue, item);
}

static void
out_queue_push (GSList **queue, GObject *obj, 
		gint32 msgno, gint32 channel_id, RRFrameType type)
{
	QueueItem *item;

	g_return_if_fail (queue != NULL);
	g_return_if_fail (obj != NULL);

	if (type == RR_FRAME_TYPE_UNKNOWN) {
		rr_debug1 ("channel::out_queue_push type == unknown, "
			   "this can't be right");
	}
	else if (type == RR_FRAME_TYPE_MSG) {

		item = queue_item_new (msgno, FALSE);
		*queue = g_slist_append (*queue, item);
	}
	else {
		item = find_queue_item (*queue, msgno, FALSE);
		if (item == NULL) {
			if (!(channel_id == 0 && msgno == 0)) {
				rr_debug1 ("channel::out_queue_push_message "
					   "enqueueing an unknown RPY, "
					   "this can't be right");
			}
			item = queue_item_new (msgno, TRUE);
			*queue = g_slist_append (*queue, item);
		}
	}
	g_queue_push_head (item->queue, obj);
	out_queue_optimize (queue);
}

static GObject *
out_queue_peek_item (GSList *queue)
{
	QueueItem *item;
	GObject *object;

	g_return_val_if_fail (queue != NULL, NULL);

	item = queue->data;
	g_assert (item != NULL);

	object = g_queue_peek_tail (item->queue);
	return object;	
}

static GObject *
out_queue_pop (GSList **queue)
{
	RRFrame *frame;
	QueueItem *item;
	GObject *object;
	GSList *tmp;

	g_return_val_if_fail (queue != NULL, NULL);
	g_return_val_if_fail (*queue != NULL, NULL);

	item = (*queue)->data;
	g_assert (item != NULL);

	object = g_queue_pop_tail (item->queue);
	g_assert (object != NULL);
	frame = (RRFrame *)object;
	if (item->queue->length > 0 ||
	    (RR_IS_FRAME (frame) && 
	     (frame->more || frame->type == RR_FRAME_TYPE_ANS)))
		return object;
	/* Remove and free the first item */
	tmp = *queue;
	*queue = g_slist_remove_link (*queue, *queue);
	queue_item_free (item);
	g_slist_free_1 (tmp);

	out_queue_optimize (queue);

	return object;	
}

static gboolean
out_queue_ready (GSList *queue)
{
	QueueItem *item;

	if (queue == NULL)
		return FALSE;

	item = queue->data;
	g_assert (item != NULL);
	return item->queue->length > 0;
}

static void
out_queue_optimize (GSList **queue)
{
	QueueItem *item;
	GSList *iter;

	if (*queue == NULL) {
		return;
	}
	
	item = (*queue)->data;
	g_assert (item != NULL);

	if (item->queue->length > 0) {
		return;
	}
	iter = (*queue)->next;
	while (iter) {

		item = iter->data;
		if (item->queue->length > 0) {
		
			g_slist_remove_link (*queue, iter);
			iter->next = *queue;
			*queue = iter;
			return;
		}
		iter = iter->next;
	}
}


syntax highlighted by Code2HTML, v. 0.9.1