/*-9 2002/05/13 11:52:502 2002/07/15 20:07:37
 * $Id: rr-manager.c,v 1.71 2002/11/08 12:35:28 jonas Exp $
 *
 * See the file LICENSE for redistribution information. 
 * If you have not received a copy of the license, please contact CodeFactory
 * by email at info@codefactory.se, or on the web at http://www.codefactory.se/
 * You may also write to: CodeFactory AB, SE-903 47, Umeå, Sweden.
 *
 * Copyright (c) 2002 Jonas Borgström <jonas@codefactory.se>
 * Copyright (c) 2002 Daniel Lundin   <daniel@codefactory.se>
 * Copyright (c) 2002 CodeFactory AB.  All rights reserved.
 */

#include <librr/rr.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

static GObjectClass *parent_class = NULL;
static void finalize (GObject *object);
static gboolean frame_available (RRChannel *channel, RRFrame *frame, 
				 GError **error);
static void
close_confirmation (RRChannel *channel, gint code,
		    const gchar *xml_lang, const gchar *description);

typedef enum 
{
	MSG_TYPE_UNKNOWN  = 0,
	MSG_TYPE_GREETING = 1,
	MSG_TYPE_START    = 2,
	MSG_TYPE_CLOSE    = 3,
} MSGType;

static void
finalize (GObject *object)
{
	RRManager *manager = (RRManager *)object;

	g_queue_free (manager->active_queue);
	g_mutex_free (manager->active_mutex);
	g_mutex_free (manager->id_mutex);
	g_mutex_free (manager->state_mutex);
	g_cond_free  (manager->state_cond);

	if (manager->greeting_error)
		g_error_free (manager->greeting_error);

	parent_class->finalize (object);
}

static void
rr_manager_init (GObject *object)
{
	RRManager *manager = (RRManager *)object;
	RRChannel *channel = (RRChannel *)object;

	manager->active_mutex = g_mutex_new ();
	manager->active_queue = g_queue_new ();
	manager->id_mutex = g_mutex_new ();
	manager->state_mutex = g_mutex_new ();
	manager->state_cond  = g_cond_new ();
	manager->expects_greeting = TRUE;

	channel->dont_queue_frames = TRUE;
}

static void
rr_manager_class_init (GObjectClass *klass)
{
	RRChannelClass *channel_class = (RRChannelClass *)klass;
	
	channel_class->frame_available = frame_available;
	channel_class->close_confirmation = close_confirmation;
	klass->finalize = finalize;

	parent_class = g_type_class_peek_parent (klass);
}

GType 
rr_manager_get_type (void)
{
	static GType rr_type = 0;

	if (!rr_type) {
		static GTypeInfo type_info = {
			sizeof (RRManagerClass),
			NULL,
			NULL,
			(GClassInitFunc) rr_manager_class_init,
			NULL,
			NULL,
			sizeof (RRManager),
			16,
			(GInstanceInitFunc) rr_manager_init
		};
		rr_type = g_type_register_static (RR_TYPE_CHANNEL, "RRManager", 
						  &type_info, 0);
	}

	return rr_type;
}

/**
 * rr_manager_set_expects_greeting:
 * @manager: the manager channel.
 * @state: %TRUE if a greeting is expected, %FALSE if not.
 * 
 * Selects if the manager is expecting an incoming greeting.
 **/
void
rr_manager_set_expects_greeting (RRManager *manager, gboolean state)
{
	g_mutex_lock (manager->state_mutex);
	manager->expects_greeting = state;
	g_cond_broadcast (manager->state_cond);
	g_mutex_unlock (manager->state_mutex);
}

/**
 * rr_manager_set_greeting_sent:
 * @manager: the manager channel.
 * @state: %TRUE if a greeting is sent, %FALSE if not.
 * 
 * Checks if a greeting has been sent.
 **/
void
rr_manager_set_greeting_sent (RRManager *manager, gboolean state)
{
	g_mutex_lock (manager->state_mutex);
	manager->greeting_sent = state;
	g_cond_broadcast (manager->state_cond);
	g_mutex_unlock (manager->state_mutex);
}

/**
 * rr_manager_wait_for_greeting:
 * @manager: the manager channel.
 * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR.
 * 
 * Blocks until a greeting message is received.
 * 
 * Return value: %TRUE on success, %FALSE on failure.
 **/
gboolean
rr_manager_wait_for_greeting (RRManager *manager, GError **error)
{
	g_assert (RR_IS_MANAGER (manager));

	/* Wait for the greeting */

	g_mutex_lock (manager->state_mutex);
	while (manager->expects_greeting)
		g_cond_wait (manager->state_cond, manager->state_mutex);
	g_mutex_unlock (manager->state_mutex);

	if (manager->greeting_error) {
		g_propagate_error (error, manager->greeting_error);
		manager->greeting_error = NULL;
		return FALSE;
	}
	return TRUE;
}

gboolean
rr_manager_wait_for_greeting_sent (RRManager *manager, GError **error)
{
	g_assert (RR_IS_MANAGER (manager));

	/* Wait for the greeting */
	g_mutex_lock (manager->state_mutex);
	while (!manager->greeting_sent)
		g_cond_wait (manager->state_cond, manager->state_mutex);
	g_mutex_unlock (manager->state_mutex);

	if (manager->greeting_error) {
		g_propagate_error (error, manager->greeting_error);
		manager->greeting_error = NULL;
		return FALSE;
	}
	return TRUE;
}

/**
 * rr_manager_send_greeting:
 * @manager: the manager channel.
 * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR.
 * 
 * Resets in_seq and msgno for the manager channel to 0 and sends a new greeting.
 * 
 * Return value: %TRUE on success, %FALSE on failure.
 **/
gboolean
rr_manager_send_greeting (RRManager *manager, GError **error)
{
	RRGreeting *greeting;
	RRChannel *channel = RR_CHANNEL (manager);
	gchar *localize;
	gboolean ret;

	g_assert (RR_IS_MANAGER (manager));

	localize = rr_connection_get_languages_str (RR_CHANNEL (manager)->connection);

	greeting = rr_greeting_new (RR_CHANNEL (manager)->connection->profreg,
				    localize, NULL);

	g_free (localize);

	channel->seq_out    = 0;
	channel->msgno      = 0;
	RR_MESSAGE (greeting)->msgno = 0;

	ret = rr_channel_send_message (RR_CHANNEL (manager),
				       RR_MESSAGE (greeting), error);
	rr_manager_set_greeting_sent (manager, TRUE);

	return ret;
}

RRManager *
rr_manager_new (gint id)
{
	RRManager *manager = g_object_new (RR_TYPE_MANAGER, NULL);

	RR_CHANNEL (manager)->id = id;

	return manager;
}

static gint32
get_next_channel_id (RRManager *manager)
{
	gint32 id;

	g_mutex_lock (manager->id_mutex);

	if (manager->next_channel_id == 0) {

		RRConnection *conn = RR_CHANNEL (manager)->connection;

		manager->next_channel_id = (conn->role == RR_ROLE_LISTENER) ? 
			2 : 1;
	}

	id = manager->next_channel_id;
	manager->next_channel_id += 2;

	g_mutex_unlock (manager->id_mutex);

	return id;
}

static gint find_channel (gconstpointer data, gconstpointer user_data)
{
	RRChannel *channel = RR_CHANNEL (data);
	GType type = GPOINTER_TO_UINT (user_data);
	
	return G_OBJECT_TYPE (channel) != type;
}

static gboolean
handle_incoming_startrpy (RRManager *manager, 
			  RRMessageStart *start_msg,
			  RRFrame *frame, 
			  GError **error)
{
	RRMessageStartRpy *rpy;
	RRConnection *conn;
	RRChannel *channel = NULL;
	GSList *list;
	GType type;

	conn = RR_CHANNEL (manager)->connection;
	rpy = rr_message_startrpy_new (NULL, NULL);
	rr_message_set_channel (RR_MESSAGE (rpy), RR_CHANNEL (manager));
	/* Parse the request */
	if (!rr_message_process_frame (RR_MESSAGE (rpy), frame, error))
		goto err;

	type = rr_profile_registry_lookup_by_uri (conn->profreg, rpy->uri);
	if (type == G_TYPE_INVALID) {
		g_set_error (error, RR_BEEP_ERROR, 
			     RR_BEEP_CODE_SERVICE_NOT_AVAIL, 
			     RR_GERROR_DEFAULT_MESSAGE);
		goto err;
	}

	list = g_slist_find_custom (start_msg->out_list,
				    GUINT_TO_POINTER (type),
				    find_channel);
	if (list == NULL) {
		g_set_error (error, RR_BEEP_ERROR, 
			     RR_BEEP_CODE_SERVICE_NOT_AVAIL, 
			     RR_GERROR_DEFAULT_MESSAGE);
		goto err;
	}
	else
		channel = RR_CHANNEL (g_object_ref (G_OBJECT (list->data)));

	rr_connection_add_channel (conn, channel);

	rr_channel_client_confirmation (channel, rpy->piggyback);

	g_object_unref (G_OBJECT (rpy));
	g_object_unref (G_OBJECT (start_msg));
	rr_message_start_done (start_msg, channel, NULL);
	return TRUE;

 err:
	rr_message_start_done (start_msg, channel, *error);
	g_object_unref (G_OBJECT (rpy));
	g_object_unref (G_OBJECT (start_msg));
	return FALSE;
}

static MSGType
identify_frame (RRFrame *frame)
{
	gchar *payload = frame->payload;
	gint len = frame->size;
	gchar *ptr;

	ptr = memchr (payload, '<', len);
	if ((NULL == ptr) || (len - (ptr - payload) < 9)) 
		return MSG_TYPE_UNKNOWN;
	else if (strncmp (ptr, "<greeting", 9) == 0)
		return MSG_TYPE_GREETING;
	else if (strncmp (ptr, "<start", 6) == 0)
		return MSG_TYPE_START;
	else if (strncmp (ptr, "<close", 6) == 0)
		return MSG_TYPE_CLOSE;
	else
		return MSG_TYPE_UNKNOWN;
}

static gboolean
handle_incoming_start (RRManager *manager, RRFrame *frame, 
		       GError **error)
{
	RRMessage *msg;
	RRMessageStart *start;
	RRConnection *conn = RR_CHANNEL (manager)->connection;
	RRChannel *channel = NULL;
	GSList *list;
	GType type = G_TYPE_INVALID;
	gboolean ret;
	gpointer global_config;

	start = rr_message_start_new (-1, NULL);
	rr_message_set_channel (RR_MESSAGE (start), RR_CHANNEL (manager));
	/* Parse the request */
	if (!rr_message_process_frame (RR_MESSAGE (start), frame, error))
		goto err;

	if (conn->server_name == NULL)
		rr_connection_set_server_name (conn, rr_message_start_get_server_name (start));

	/* Search the profile list for the first channel that is startable */
	list = rr_message_start_get_channel_list (start);
	while (list) {
		RRStartItem *item = (RRStartItem *)list->data;
		type = item->type;
		channel = g_object_new (type, NULL);
		rr_channel_set_connection (channel, conn);

		global_config = NULL;
		if (conn->profreg) {
			global_config = rr_profile_registry_get_global_config (conn->profreg, type);
		}
		channel->instance_config = NULL;
		channel->global_config = global_config;

		/* Clear any pending error messages from previous profile start attempts */
		if (error && *error) {
			g_error_free (*error);
			*error = NULL;
		}
		if (!rr_channel_server_init (channel, item->piggyback,
					       error)) {

			if (error && *error) {

				rr_debug1 ("manager::handle_incoming_start "
					   "rr_channel_server_init failed: %s, %s", 
					   G_OBJECT_TYPE_NAME (G_OBJECT (channel)),
					   (*error)->message);
			}
			g_object_unref (G_OBJECT (channel));
			channel = NULL;
			list = list->next;
			continue;
		}
		break;
	}
	/* Were we able to start any profile? */
	if (channel == NULL) {
		/* Set a default error message if no one is provided */
		if (error && *error == NULL) {
			g_set_error (error, RR_BEEP_ERROR, 
				     RR_BEEP_CODE_SERVICE_NOT_AVAIL, 
				     "Failed to start any profile");
		}
		goto err;
	}
	/* Clear any obsolete error messages from previous start attempts */
	if (error && *error) {
		g_error_free (*error);
		*error = NULL;
	}
	channel->id = start->number;
	g_object_unref (G_OBJECT (start));

	rr_connection_add_channel (conn, channel);

	/* Send a message and tell the peer which profile we started */
	msg = (RRMessage *)rr_message_startrpy_new (rr_channel_get_uri (type),
						    rr_channel_get_piggyback (channel));
	msg->msgno = frame->msgno;
	RR_MESSAGE_STARTRPY (msg)->channel_to_activate = channel;

	/* lower the refcount by one, because rr_connection_add_channel 
	 * increased it */
	g_object_unref (G_OBJECT (channel));

	/* Disable the channel until the startrpy message is really sent */
	channel->disabled = TRUE;
	ret = rr_channel_send_message (RR_CHANNEL (manager), RR_MESSAGE (msg),
 					error);

	rr_channel_server_confirmation (channel);

	return ret;
 err:
	return FALSE;
}

static gboolean
handle_incoming_close (RRManager *manager, RRFrame *frame, 
		       GError **error)
{
	RRMessageClose *close;
	RRMessage *msg;
	RRConnection *conn;
	RRChannel *channel;

	conn = RR_CHANNEL (manager)->connection;
	close = rr_message_close_new (-1, -1, NULL, NULL);
	rr_message_set_channel (RR_MESSAGE (close), RR_CHANNEL (manager));
	/* Parse the request */
	if (!rr_message_process_frame (RR_MESSAGE (close), frame, error))
		goto err;

	/* See if the channel exists */
	channel = rr_connection_get_channel (conn, 
					     close->channelnumber);
	if (channel == NULL) {
		g_set_error (error, RR_BEEP_ERROR, 
			     RR_BEEP_CODE_PARAM_ERROR,
			     "Unknown channel");
		goto err;
	}
	/* Ask if it is okey to close */
	if (!rr_channel_close_indication (channel, close->code, 
					  close->xml_lang,
					  close->diagnostic, error))
		goto err;

	/* Tell the channel that we mean business */
	rr_channel_close_confirmation (channel, close->code, close->xml_lang,
				       close->diagnostic);
	if (close->channelnumber)
		rr_connection_remove_channel (conn, channel);
	g_object_unref (G_OBJECT (close));

	/* Create a success reply message */
	msg = rr_message_static_new (RR_FRAME_TYPE_RPY, RR_BEEP_CLOSE_OK, 
				     strlen (RR_BEEP_CLOSE_OK), FALSE);
	msg->msgno = frame->msgno;
	return rr_channel_send_message (RR_CHANNEL (manager), RR_MESSAGE (msg),
 					error);
 err:
	g_object_unref (G_OBJECT (close));
	return FALSE;
}

static gboolean
handle_incoming_greeting (RRManager *manager, RRFrame *frame, 
			  GError **error)
{
	RRGreeting *greeting;
	RRConnection *conn = RR_CHANNEL (manager)->connection;
	gboolean ret = TRUE;

	greeting = rr_greeting_new (NULL, NULL, NULL);
	rr_message_set_channel (RR_MESSAGE (greeting), RR_CHANNEL (manager));
	/* Parse the request */
	if (!rr_message_process_frame (RR_MESSAGE (greeting), frame, error))
		ret = FALSE;
	/* Hijack the peer profile list */
	rr_connection_set_peer_profiles (conn, greeting->peer_profiles);
	greeting->peer_profiles = NULL;

	g_object_unref (G_OBJECT (greeting));

	rr_manager_set_expects_greeting (manager, FALSE);

	return ret;
}

static void
handle_incoming_closerpy (RRManager *manager, RRMessageClose *close,
			  RRFrame *frame, GError **error)
{
	RRChannel *channel;
	RRConnection *conn = RR_CHANNEL (manager)->connection;

	channel = rr_connection_get_channel (conn, close->channelnumber);
	g_return_if_fail (RR_IS_CHANNEL (channel));

	rr_channel_close_confirmation (channel, close->code, 
				       close->xml_lang, 
				       close->diagnostic);

	rr_connection_remove_channel (conn, channel);

	rr_message_close_done (close, NULL);
	g_object_unref (G_OBJECT (close));
}

static void
handle_incoming_error (RRManager *manager, RRFrame *frame, 
		       GError **error)
{
	RRMessageError *err;
	RRMessage *msg;
	GError *gerror = NULL;

	/* FIXME: It might be an error generated due to a faulty greeting */

	err = rr_message_error_new (0, NULL, NULL);
	rr_message_set_channel (RR_MESSAGE (err), RR_CHANNEL (manager));
	/* Parse the request */
	if (rr_message_process_frame (RR_MESSAGE (err), frame, error))
		rr_message_error_set_gerror (err, &gerror);

	g_object_unref (G_OBJECT (err));

	g_mutex_lock (manager->active_mutex);
	msg = g_queue_pop_head (manager->active_queue);
	g_mutex_unlock (manager->active_mutex);

	if (RR_IS_MESSAGE_START (msg))
		rr_message_start_done (RR_MESSAGE_START (msg), NULL, gerror);
	else if (RR_IS_MESSAGE_CLOSE (msg))
		rr_message_close_done (RR_MESSAGE_CLOSE (msg), gerror);
	else
		g_warning ("unexpected error frame: '%s'\n", 
			   gerror->message);
	if (msg)
		g_object_unref (G_OBJECT (msg));

	g_error_free (gerror);
}

static gboolean
frame_available (RRChannel *channel, RRFrame *frame, GError **error)
{
	RRManager *manager = RR_MANAGER (channel);

	g_return_val_if_fail (RR_IS_FRAME (frame), FALSE);

	if (frame->type == RR_FRAME_TYPE_MSG) {

		switch (identify_frame (frame)) {
		case MSG_TYPE_START:
			return handle_incoming_start (manager, frame, error);

		case MSG_TYPE_CLOSE:
			return handle_incoming_close (manager, frame, error);

		default:
			g_set_error (error,
				     RR_BEEP_ERROR, RR_BEEP_CODE_SYNTAX_ERROR,
				     "Unknown message on channel 0");
			return FALSE;
			
		}
	}
	else if (frame->type == RR_FRAME_TYPE_RPY) {
		if (identify_frame (frame) == MSG_TYPE_GREETING)
			return handle_incoming_greeting (manager, frame, error);
		else {
			RRMessage *msg;

			g_mutex_lock (manager->active_mutex);
			msg = g_queue_pop_head (manager->active_queue);
			if (msg == NULL) {

				g_set_error (error,
					     RR_BEEP_ERROR, 
					     RR_BEEP_CODE_SYNTAX_ERROR,
					     "Unexpected reply");
				g_mutex_unlock (manager->active_mutex);
				return FALSE;
			}
			if (msg->msgno != frame->msgno) {

				g_queue_push_head (manager->active_queue, msg);
				g_set_error (error,
					     RR_BEEP_ERROR, 
					     RR_BEEP_CODE_SYNTAX_ERROR,
					     "Wrong msgno");
				g_mutex_unlock (manager->active_mutex);
				return FALSE;
			}
			g_mutex_unlock (manager->active_mutex);

			if (RR_IS_MESSAGE_START (msg)) {
				handle_incoming_startrpy (manager, 
							  RR_MESSAGE_START (msg),
							  frame, 
							  error);
			}
			else if (RR_IS_MESSAGE_CLOSE (msg))
				handle_incoming_closerpy (manager, 
							  RR_MESSAGE_CLOSE (msg),
							  frame,
							  error);
			else
				g_assert_not_reached ();

		}
	}
	else if (frame->type == RR_FRAME_TYPE_ERR)
		handle_incoming_error (manager, frame, error);

	return TRUE;
}

/**
 * rr_manager_start:
 * @manager: The #RRManager.
 * @server_name: # The server_name parameter or NULL.
 * @profile_type: The profile type to start.
 * @config_data: Profile specific config data or NULL.
 * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR
 * 
 * Tries to create a new channel of the provided profile type.
 * 
 * Return value: A #RRProfile on success, %NULL on failure.
 **/
RRChannel *
rr_manager_start (RRManager *manager, const gchar *server_name,
		  GType profile_type, gpointer config_data, GError **error)
{
	return rr_manager_start_multi (manager, server_name, error, 
				       profile_type,
				       config_data, 
				       G_TYPE_INVALID);
}

/**
 * rr_manager_start_multi:
 * @manager: The #RRManager.
 * @server_name: The server_name parameter or NULL.
 * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR
 * @...: A list of Profile type + config_data pairs and a G_TYPE_INVALID at the
 * end.
 * 
 * Tries to create a new channel with one of the provided profiles.
 * 
 * Return value: A #RRProfile on success, %NULL on failure.
 **/
RRChannel *
rr_manager_start_multi (RRManager *manager, const gchar *server_name,
			GError **error, ...)
{
	RRChannel *ret;
	va_list args;

	va_start (args, error);
	ret = rr_manager_start_multiv (manager, server_name, error, args);
	va_end (args);

	return ret;
}

RRChannel *
rr_manager_start_multiv (RRManager *manager, const gchar *server_name,
			 GError **error, va_list args)
{
	RRConnection *conn = RR_CHANNEL (manager)->connection;
	RRMessageStart *start_msg;
	RRChannel *channel;
	GType type;
	gint32 id;

	g_return_val_if_fail (RR_IS_MANAGER (manager), NULL);

	g_mutex_lock (manager->active_mutex);
		
	id = get_next_channel_id (manager);
	start_msg = rr_message_start_new (id, server_name);
	g_object_ref (G_OBJECT (start_msg));
	
	for (;;) {
		gpointer config_data;

		type = va_arg (args, GType);
		config_data = va_arg(args, gpointer);

		if (type == G_TYPE_INVALID)
			break;
		
		rr_message_start_add_channel (start_msg, conn, type, 
					      config_data);
	}

	if (rr_message_start_empty_request_p (start_msg)) {

		/* If all profiles returned FALSE from the
		   rr_channel_client_init function, we have
		   to bail out */
		if (start_msg->client_init_error) {
			g_propagate_error (error, 
					   start_msg->client_init_error);
			start_msg->client_init_error = NULL;
		}
		else {
			g_set_error (error, RR_BEEP_ERROR, 
				     RR_BEEP_CODE_NO_ACTION, 
				     "No profiles to request");
		}
		g_object_unref (G_OBJECT (start_msg));
		g_object_unref (G_OBJECT (start_msg));
		g_mutex_unlock (manager->active_mutex);
		return NULL;
	}
	
	if (manager->shutting_down ||
	    !rr_channel_send_message (RR_CHANNEL (manager), 
				      RR_MESSAGE (start_msg), error)) {

		g_object_unref (G_OBJECT (start_msg));
		g_object_unref (G_OBJECT (start_msg));
		g_mutex_unlock (manager->active_mutex);
		return NULL;
	}
	g_object_ref (G_OBJECT (start_msg));
	g_queue_push_tail (manager->active_queue, start_msg);
	g_mutex_unlock (manager->active_mutex);
	
	channel = rr_message_start_wait_for_reply (start_msg, error);

	g_object_unref (G_OBJECT (start_msg));

	return channel;
}

static void
close_confirmation (RRChannel *channel, gint code,
		    const gchar *xml_lang, const gchar *description)
{
	RRManager *manager = RR_MANAGER (channel);
	RRMessage *msg;
	GError *error;

	error = g_error_new (RR_BEEP_ERROR, RR_ERROR_DISCONNECTED, description);

	if (manager->greeting_error)
		g_error_free (manager->greeting_error);

	manager->greeting_error = error;
	rr_manager_set_expects_greeting (manager, FALSE);
	rr_manager_set_greeting_sent (manager, TRUE);

	g_mutex_lock (manager->active_mutex);

	while ((msg = g_queue_pop_head (manager->active_queue))) {

		if (RR_IS_MESSAGE_START (msg))
			rr_message_start_done (RR_MESSAGE_START (msg), NULL, error);
		else if (RR_IS_MESSAGE_CLOSE (msg))
			rr_message_close_done (RR_MESSAGE_CLOSE (msg), error);
		else
			g_assert_not_reached ();

		g_object_unref (G_OBJECT (msg));
	}
	manager->shutting_down = TRUE;
	g_mutex_unlock (manager->active_mutex);
}

/**
 * rr_manager_close_channel:
 * @manager: A RRManager
 * @channel: The channel/profile to close
 * @code: a three-digit reply code meaningful to programs
 * @xml_lang: 
 * @diagnostic: A textual message that is meaningful to implementers
 *              but not programs.
 * @error: location to return an error of type RR_ERROR or RR_BEEP_ERROR
 * 
 * 
 * 
 * Return value: %TRUE on success, %FALSE on failure.
 **/
gboolean
rr_manager_close_channel (RRManager *manager, RRChannel *channel,
			  gint code, const gchar *xml_lang, 
			  const gchar *diagnostic, GError **error)
{
	RRMessage *close_msg;

	g_return_val_if_fail (RR_IS_MANAGER (manager), FALSE);
	g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE);

	if (!rr_channel_flush (channel, error))
		return FALSE;

	if (!rr_channel_close_indication (channel, code, xml_lang, 
					  diagnostic, error))
		return FALSE;

	g_mutex_lock (manager->active_mutex);

	close_msg = (RRMessage *)rr_message_close_new (channel->id, code, 
						       xml_lang, diagnostic);
	g_object_ref (G_OBJECT (close_msg));

	if (manager->shutting_down ||
	    !rr_channel_send_message (RR_CHANNEL (manager), 
				      close_msg, error)) {

		g_object_unref (G_OBJECT (close_msg));
		g_object_unref (G_OBJECT (close_msg));
		g_mutex_unlock (manager->active_mutex);
		return FALSE;
	}
	g_object_ref (G_OBJECT (close_msg));
	g_queue_push_tail (manager->active_queue, close_msg);
	g_mutex_unlock (manager->active_mutex);
	
	if (!rr_message_close_wait_for_reply (RR_MESSAGE_CLOSE (close_msg),
					      error)) {
		g_object_unref (G_OBJECT (close_msg));
		return FALSE;
	}
	g_object_unref (G_OBJECT (close_msg));

	return TRUE;
}

gboolean
rr_manager_close_channel_nonblock (RRManager *manager, RRChannel *channel,
				   gint code, const gchar *xml_lang, 
				   const gchar *diagnostic, GError **error)
{
	RRMessage *close_msg;

	g_return_val_if_fail (RR_IS_MANAGER (manager), FALSE);
	g_return_val_if_fail (RR_IS_CHANNEL (channel), FALSE);

	if (!rr_channel_close_indication (channel, code, xml_lang, 
					  diagnostic, error))
		return FALSE;

	close_msg = (RRMessage *)rr_message_close_new (channel->id, code, 
						       xml_lang, diagnostic);

	g_mutex_lock (manager->active_mutex);
	g_object_ref (G_OBJECT (close_msg));
	if (!rr_channel_send_message (RR_CHANNEL (manager), 
				      close_msg, error)) {

		g_object_unref (G_OBJECT (close_msg));
		g_mutex_unlock (manager->active_mutex);
		return FALSE;
	}
	g_queue_push_tail (manager->active_queue, close_msg);
	g_mutex_unlock (manager->active_mutex);
	
	return TRUE;
}


syntax highlighted by Code2HTML, v. 0.9.1