#include "gskxmlrpcstream.h"
#include "../gskmacros.h"

static GObjectClass *parent_class = NULL;

struct _GskXmlrpcIncoming
{
  GskXmlrpcRequest *request;
  GskXmlrpcResponse *response;
  GskXmlrpcIncoming *next;
};

struct _GskXmlrpcOutgoing
{
  GskXmlrpcRequest *request;
  GskXmlrpcResponseNotify notify;
  gpointer data;
  GDestroyNotify destroy;
  GskXmlrpcOutgoing *next;
};

/* --- GskStream methods --- */
static gboolean
gsk_xmlrpc_stream_shutdown_write (GskIO      *io,
				  GError    **error)
{
  GskXmlrpcStream *xmlrpc_stream = GSK_XMLRPC_STREAM (io);

  /* XXX: error conditions... */

  if (xmlrpc_stream->last_request == NULL
   && xmlrpc_stream->outgoing.size != 0)
    gsk_io_notify_read_shutdown (io);
  return TRUE;
}

static gboolean
gsk_xmlrpc_stream_shutdown_read (GskIO      *io,
				 GError    **error)
{
  GskXmlrpcStream *xmlrpc_stream = GSK_XMLRPC_STREAM (io);
  if (xmlrpc_stream->outgoing.size != 0)
    {
      g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_LINGERING_DATA,
		   "data waiting to be written on shutdown stream");
      return FALSE;
    }
  if (gsk_io_get_is_writable (io))
    {
      if (!gsk_io_write_shutdown (io, error))
	return FALSE;
    }
  return TRUE;
}

static guint
gsk_xmlrpc_stream_raw_read (GskStream     *stream,
                            gpointer       data,
                            guint          length,
                            GError       **error)
{
  GskXmlrpcStream *xmlrpc_stream = GSK_XMLRPC_STREAM (stream);
  guint rv = gsk_buffer_read (&xmlrpc_stream->outgoing, data, length);
  if (xmlrpc_stream->outgoing.size == 0)
    {
      gsk_stream_clear_idle_notify_read (stream);
      if (!gsk_io_get_is_writable (stream)
       && xmlrpc_stream->last_request == NULL)
	gsk_io_notify_read_shutdown (stream);
    }
  return rv;
}

static guint
gsk_xmlrpc_stream_raw_read_buffer (GskStream     *stream,
                                   GskBuffer     *buffer,
                                   GError       **error)
{
  GskXmlrpcStream *xmlrpc_stream = GSK_XMLRPC_STREAM (stream);
  guint rv = gsk_buffer_drain (buffer, &xmlrpc_stream->outgoing);
  if (xmlrpc_stream->outgoing.size == 0)
    gsk_stream_clear_idle_notify_read (stream);
  return rv;
}

/* Handle a request from the other side. */
static void
handle_request (GskXmlrpcStream *stream, GskXmlrpcRequest *request)
{
  GskXmlrpcIncoming *incoming = g_new (GskXmlrpcIncoming, 1);
  incoming->request = request;
  incoming->response = NULL;
  incoming->next = NULL;
  if (stream->next_to_dequeue == NULL)
    gsk_hook_mark_idle_notify (GSK_XMLRPC_STREAM_REQUEST_HOOK (stream));
  if (stream->first_unhandled_request == NULL)
    {
      stream->first_unhandled_request = incoming;
      stream->last_request = incoming;
      stream->next_to_dequeue = incoming;
    }
  else
    {
      stream->last_request->next = incoming;
      stream->last_request = incoming;
      if (stream->next_to_dequeue == NULL)
	stream->next_to_dequeue = incoming;
    }
}

/* Handle a response from the other side. */
static gboolean
handle_response (GskXmlrpcStream *stream, GskXmlrpcResponse *response)
{
  GskXmlrpcOutgoing *outgoing;
  if (stream->first_unresponded_request == NULL)
    return FALSE;

  outgoing = stream->first_unresponded_request;
  stream->first_unresponded_request = outgoing->next;
  if (stream->first_unresponded_request == NULL)
    stream->last_unresponded_request = NULL;

  (*outgoing->notify) (outgoing->request, response, outgoing->data);
  if (outgoing->destroy != NULL)
    (*outgoing->destroy) (outgoing->data);
  gsk_xmlrpc_request_unref (outgoing->request);
  g_free (outgoing);
  return TRUE;
}

static guint
gsk_xmlrpc_stream_raw_write (GskStream     *stream,
                             gconstpointer  data,
                             guint          length,
                             GError       **error)
{
  GskXmlrpcStream *xmlrpc_stream = GSK_XMLRPC_STREAM (stream);
  GskXmlrpcParser *parser = xmlrpc_stream->parser;
  GskXmlrpcRequest *request;
  GskXmlrpcResponse *response;
  if (!gsk_xmlrpc_parser_feed (parser, data, length, error))
    return 0;
  while ((request=gsk_xmlrpc_parser_get_request (parser)) != NULL)
    handle_request (xmlrpc_stream, request);
  while ((response=gsk_xmlrpc_parser_get_response (parser)) != NULL)
    if (!handle_response (xmlrpc_stream, response))
      {
	g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_BAD_FORMAT,
		     _("writing to XMLRPC stream: got unsolicited response"));
	return 0;
      }
  return length;
}

static void
gsk_xmlrpc_stream_finalize (GObject *object)
{
  GskXmlrpcStream *xmlrpc_stream = GSK_XMLRPC_STREAM (object);
  gsk_xmlrpc_parser_free (xmlrpc_stream->parser);
  gsk_hook_destruct(&xmlrpc_stream->incoming_request_hook);
  (*parent_class->finalize) (object);
  
}

/* --- functions --- */
static void
gsk_xmlrpc_stream_init (GskXmlrpcStream *xmlrpc_stream)
{
  xmlrpc_stream->parser = gsk_xmlrpc_parser_new (xmlrpc_stream);
  gsk_stream_mark_is_readable (GSK_STREAM (xmlrpc_stream));
  gsk_stream_mark_is_writable (GSK_STREAM (xmlrpc_stream));
  GSK_HOOK_INIT (xmlrpc_stream, GskXmlrpcStream, incoming_request_hook, 0,
		 set_poll_requestable, shutdown_requestable);
  GSK_HOOK_SET_FLAG (GSK_XMLRPC_STREAM_REQUEST_HOOK (xmlrpc_stream), IS_AVAILABLE);
}

static void
gsk_xmlrpc_stream_class_init (GskXmlrpcStreamClass *class)
{
  GskStreamClass *stream_class = GSK_STREAM_CLASS (class);
  GskIOClass *io_class = GSK_IO_CLASS (class);
  GObjectClass *object_class = G_OBJECT_CLASS (class);
  parent_class = g_type_class_peek_parent (class);
  io_class->shutdown_read = gsk_xmlrpc_stream_shutdown_read;
  io_class->shutdown_write = gsk_xmlrpc_stream_shutdown_write;
  stream_class->raw_read = gsk_xmlrpc_stream_raw_read;
  stream_class->raw_write = gsk_xmlrpc_stream_raw_write;
  stream_class->raw_read_buffer = gsk_xmlrpc_stream_raw_read_buffer;
  object_class->finalize = gsk_xmlrpc_stream_finalize;
  GSK_HOOK_CLASS_INIT (G_OBJECT_CLASS (class), "incoming-request-hook", GskXmlrpcStream, incoming_request_hook);
}

GType gsk_xmlrpc_stream_get_type()
{
  static GType xmlrpc_stream_type = 0;
  if (!xmlrpc_stream_type)
    {
      static const GTypeInfo xmlrpc_stream_info =
      {
	sizeof(GskXmlrpcStreamClass),
	(GBaseInitFunc) NULL,
	(GBaseFinalizeFunc) NULL,
	(GClassInitFunc) gsk_xmlrpc_stream_class_init,
	NULL,		/* class_finalize */
	NULL,		/* class_data */
	sizeof (GskXmlrpcStream),
	0,		/* n_preallocs */
	(GInstanceInitFunc) gsk_xmlrpc_stream_init,
	NULL		/* value_table */
      };
      xmlrpc_stream_type = g_type_register_static (GSK_TYPE_STREAM,
                                                  "GskXmlrpcStream",
						  &xmlrpc_stream_info, 0);
    }
  return xmlrpc_stream_type;
}

/* Handle incoming requests. */
/**
 * gsk_xmlrpc_stream_get_request:
 * @stream: the stream to dequeue an incomiung request from.
 *
 * Grab a new request from the stream.
 * The caller should eventually respond to it with
 * gsk_xmlrpc_stream_respond().   
 *
 * returns: a reference to a remote request which the caller
 * must call gsk_xmlrpc_request_unref() on eventually,
 * or NULL if no unhandled requests are available.
 */
GskXmlrpcRequest *gsk_xmlrpc_stream_get_request (GskXmlrpcStream *stream)
{
  GskXmlrpcRequest *request;
  if (stream->next_to_dequeue == NULL)
    return NULL;
  request = gsk_xmlrpc_request_ref (stream->next_to_dequeue->request);
  stream->next_to_dequeue = stream->next_to_dequeue->next;
  if (stream->next_to_dequeue == NULL)
    gsk_hook_clear_idle_notify (GSK_XMLRPC_STREAM_REQUEST_HOOK (stream));
  return request;
}

static void
try_flushing_incoming_requests (GskXmlrpcStream *stream)
{
  gboolean mark_idle_notify = FALSE;
  while (stream->first_unhandled_request != NULL
      && stream->first_unhandled_request->response != NULL)
    {
      GskXmlrpcIncoming *incoming = stream->first_unhandled_request;
      stream->first_unhandled_request = incoming->next;
      if (stream->first_unhandled_request == NULL)
	stream->last_request = NULL;

      g_assert (incoming != stream->next_to_dequeue);

      gsk_xmlrpc_response_to_buffer (incoming->response, &stream->outgoing);
      mark_idle_notify = TRUE;

      gsk_xmlrpc_request_unref (incoming->request);
      gsk_xmlrpc_response_unref (incoming->response);
      g_free (incoming);
    }
  if (mark_idle_notify)
    gsk_stream_mark_idle_notify_read (GSK_STREAM (stream));
}

/**
 * gsk_xmlrpc_stream_respond:
 * @stream: the stream where the incoming request came in.
 * @request: the request initiated by the other side.
 * @response: local response to the request.
 *
 * Give the RPC result to the other side of this connection.
 */
void              gsk_xmlrpc_stream_respond     (GskXmlrpcStream *stream,
						 GskXmlrpcRequest *request,
						 GskXmlrpcResponse *response)
{
  GskXmlrpcIncoming *incoming;
  for (incoming = stream->first_unhandled_request;
       incoming != NULL;
       incoming = incoming->next)
    if (incoming->request == request)
      break;
  g_return_if_fail (incoming->response == NULL);
  incoming->response = gsk_xmlrpc_response_ref (response);
  try_flushing_incoming_requests (stream);
}

/* Make outgoing requests. */
/**
 * gsk_xmlrpc_stream_make_request:
 * @stream: the stream to make the request on.
 * @request: the request to issue.
 * @notify: callback to eventaully invoke with the remote response,
 * if we get it.
 * @data: opaque user data to pass to the @notify function eventually.
 * @destroy: callback to invoke after the handler is run,
 * or if the stream shuts down before a response is obtained.
 *
 * Make a request (a method call) to the other side of this
 * #GskXmlrpcStream.  When a response is received,
 * @notify will be called, then destroy will be called.
 *
 * If the stream shuts down before a notify is obtained,
 * then just @destroy is run.
 */
void              gsk_xmlrpc_stream_make_request (GskXmlrpcStream *stream,
						  GskXmlrpcRequest *request,
						  GskXmlrpcResponseNotify notify,
						  gpointer data,
						  GDestroyNotify destroy)
{
  GskXmlrpcOutgoing *outgoing = g_new (GskXmlrpcOutgoing, 1);
  outgoing->request = g_object_ref (request);
  outgoing->notify = notify;
  outgoing->data = data;
  outgoing->destroy = destroy;
  outgoing->next = NULL;

  if (stream->first_unresponded_request == NULL)
    stream->first_unresponded_request = outgoing;
  else
    stream->last_unresponded_request->next = outgoing;
  stream->last_unresponded_request = outgoing;

  gsk_xmlrpc_request_to_buffer (request, &stream->outgoing);
  gsk_stream_mark_idle_notify_read (GSK_STREAM (stream));
}


syntax highlighted by Code2HTML, v. 0.9.1