#include "gskpersistentconnection.h"
#include "gskstreamclient.h"
#include "gsknameresolver.h"

G_DEFINE_TYPE(GskPersistentConnection, gsk_persistent_connection, GSK_TYPE_STREAM);

static guint handle_connected_signal_id = 0;
static guint handle_disconnected_signal_id = 0;

static void gsk_persistent_connection_set_poll_read (GskIO    *io,
                                                     gboolean  should_poll);
static void gsk_persistent_connection_set_poll_write(GskIO    *io,
                                                     gboolean  should_poll);

static inline void
maybe_message (GskPersistentConnection *connection,
               const char              *verb)
{
  if (connection->debug_connection)
    {
      char *location = gsk_socket_address_to_string (connection->address);
      g_message ("%s %s", verb, location);
      g_free (location);
    }
}

static void
gsk_persistent_connection_handle_disconnected (GskPersistentConnection *connection)
{
  maybe_message (connection, "disconnected from");
}

static void
gsk_persistent_connection_handle_connected (GskPersistentConnection *connection)
{
  maybe_message (connection, "connected to");
}

static guint
gsk_persistent_connection_raw_read(GskStream     *stream,
			 	   gpointer       data,
			 	   guint          length,
			 	   GError       **error)
{
  GskPersistentConnection *connection = GSK_PERSISTENT_CONNECTION (stream);
  if (connection->transport == NULL)
    return 0;
  return gsk_stream_read (connection->transport, data, length, error);
}

static guint
gsk_persistent_connection_raw_write(GskStream     *stream,
			 	    gconstpointer  data,
			 	    guint          length,
			 	    GError       **error)
{
  GskPersistentConnection *connection = GSK_PERSISTENT_CONNECTION (stream);
  if (connection->transport == NULL)
    return 0;
  return gsk_stream_write (connection->transport, data, length, error);
}

static guint
gsk_persistent_connection_raw_read_buffer(GskStream     *stream,
				          GskBuffer     *buffer,
				          GError       **error)
{
  GskPersistentConnection *connection = GSK_PERSISTENT_CONNECTION (stream);
  if (connection->transport == NULL)
    return 0;
  return gsk_stream_read_buffer (connection->transport, buffer, error);
}

static guint
gsk_persistent_connection_raw_write_buffer(GskStream    *stream,
                                           GskBuffer     *buffer,
				           GError       **error)
{
  GskPersistentConnection *connection = GSK_PERSISTENT_CONNECTION (stream);
  if (connection->transport == NULL)
    return 0;
  return gsk_stream_write_buffer (connection->transport, buffer, error);
}

static void
shutdown_transport (GskPersistentConnection *connection)
{
  if (gsk_io_has_write_hook (connection->transport))
    gsk_io_untrap_writable (connection->transport);
  if (gsk_io_has_read_hook (connection->transport))
    gsk_io_untrap_readable (connection->transport);
  gsk_io_shutdown (GSK_IO (connection->transport), NULL);
  if (connection->state == GSK_PERSISTENT_CONNECTION_CONNECTING)
    g_signal_handler_disconnect (G_OBJECT (connection->transport),
                         connection->transport_on_connect_signal_handler);
  g_signal_handler_disconnect (G_OBJECT (connection->transport),
                       connection->transport_on_error_signal_handler);
  g_object_unref (connection->transport);
  connection->transport = NULL;
}

static void
gsk_persistent_connection_finalize (GObject *object)
{
  GskPersistentConnection *connection = GSK_PERSISTENT_CONNECTION (object);
  if (connection->transport != NULL)
    shutdown_transport (connection);
  if (connection->retry_timeout_source)
    {
      GskSource *source = connection->retry_timeout_source;
      connection->retry_timeout_source = NULL;
      gsk_source_remove (source);
    }
  G_OBJECT_CLASS (gsk_persistent_connection_parent_class)->finalize (object);
}

static void
gsk_persistent_connection_init (GskPersistentConnection *connection)
{
  gsk_io_mark_is_readable (connection);
  gsk_io_mark_is_writable (connection);
}

static void
gsk_persistent_connection_class_init (GskPersistentConnectionClass *class)
{
  GskIOClass *io_class = GSK_IO_CLASS (class);
  GskStreamClass *stream_class = GSK_STREAM_CLASS (class);
  GObjectClass *object_class = G_OBJECT_CLASS (class);
  class->handle_connected = gsk_persistent_connection_handle_connected;
  class->handle_disconnected = gsk_persistent_connection_handle_disconnected;
  io_class->set_poll_read = gsk_persistent_connection_set_poll_read;
  io_class->set_poll_write = gsk_persistent_connection_set_poll_write;
  stream_class->raw_read = gsk_persistent_connection_raw_read;
  stream_class->raw_write = gsk_persistent_connection_raw_write;
  stream_class->raw_read_buffer = gsk_persistent_connection_raw_read_buffer;
  stream_class->raw_write_buffer = gsk_persistent_connection_raw_write_buffer;
  object_class->finalize = gsk_persistent_connection_finalize;


  handle_connected_signal_id
    = g_signal_new ("handle-connected",
                    G_OBJECT_CLASS_TYPE (object_class),
                    G_SIGNAL_RUN_LAST,
                    G_STRUCT_OFFSET (GskPersistentConnectionClass, handle_connected),
                    NULL, NULL,
                    g_cclosure_marshal_VOID__VOID,
                    G_TYPE_NONE,
                    0);
  handle_disconnected_signal_id
    = g_signal_new ("handle-disconnected",
                    G_OBJECT_CLASS_TYPE (object_class),
                    G_SIGNAL_RUN_LAST,
                    G_STRUCT_OFFSET (GskPersistentConnectionClass, handle_disconnected),
                    NULL, NULL,
                    g_cclosure_marshal_VOID__VOID,
                    G_TYPE_NONE,
                    0);
}

static gboolean handle_retry_timeout_expired (gpointer data);

static void
setup_timeout (GskPersistentConnection *connection)
{
  g_return_if_fail (connection->retry_timeout_source == NULL);
  connection->retry_timeout_source
    = gsk_main_loop_add_timer (gsk_main_loop_default (),
                               handle_retry_timeout_expired,
                               connection,
                               NULL,
                               connection->retry_timeout_ms,
                               -1);
  connection->state = GSK_PERSISTENT_CONNECTION_WAITING;
}

static void
handle_transport_connected (GskStream *stream,
                            GskPersistentConnection *connection)
{
  g_return_if_fail (connection->transport == stream);
  g_return_if_fail (connection->state == GSK_PERSISTENT_CONNECTION_CONNECTING);
  connection->state = GSK_PERSISTENT_CONNECTION_CONNECTED;
  g_signal_handler_disconnect (stream,
                               connection->transport_on_connect_signal_handler);
  g_signal_emit (connection, handle_connected_signal_id, 0);
}

static gboolean
handle_transport_readable (GskStream               *transport,
                           GskPersistentConnection *connection)
{
  g_return_val_if_fail (connection->transport == transport, FALSE);
  gsk_io_notify_ready_to_read (connection);
  return TRUE;
}

static gboolean
handle_transport_read_shutdown (GskStream           *transport,
                                GskPersistentConnection *connection)
{
  GError *error = NULL;
  g_return_val_if_fail (connection->transport == transport, FALSE);
  if (connection->state == GSK_PERSISTENT_CONNECTION_CONNECTED
   || connection->state == GSK_PERSISTENT_CONNECTION_CONNECTING)
    {
      if (gsk_io_has_write_hook (transport))
        gsk_io_untrap_writable (transport);
      connection->state = GSK_PERSISTENT_CONNECTION_WAITING;
      g_signal_emit (connection, handle_disconnected_signal_id, 0);
      setup_timeout (connection);
    }

  if (!gsk_io_write_shutdown (transport, &error))
    {
      g_warning ("error shutting down transport: %s", error->message);
      g_error_free (error);
    }
  return FALSE;
}

static gboolean
handle_transport_writable (GskStream           *transport,
                           GskPersistentConnection *connection)
{
  g_return_val_if_fail (connection->transport == transport, FALSE);
  gsk_io_notify_ready_to_write (connection);
  return TRUE;
}

static void
handle_transport_error (GskStream *transport,
                        GskPersistentConnection *connection)
{
  g_return_if_fail (connection->transport == transport);
  if (connection->warn_on_transport_errors)
    g_warning ("error in transport: %s", GSK_IO (transport)->error->message);
  shutdown_transport (connection);
  g_signal_emit (connection, handle_disconnected_signal_id, 0);
  setup_timeout (connection);
}

static gboolean
handle_transport_write_shutdown (GskStream           *transport,
                                 GskPersistentConnection *connection)
{
  GError *error = NULL;
  g_return_val_if_fail (connection->transport == transport, FALSE);
  if (connection->state == GSK_PERSISTENT_CONNECTION_CONNECTED
   || connection->state == GSK_PERSISTENT_CONNECTION_CONNECTING)
    {
      if (gsk_io_has_read_hook (transport))
        gsk_io_untrap_readable (transport);
      connection->state = GSK_PERSISTENT_CONNECTION_WAITING;
      g_signal_emit (connection, handle_disconnected_signal_id, 0);
      setup_timeout (connection);
    }

  if (!gsk_io_read_shutdown (transport, &error))
    {
      g_warning ("error shutting down transport: %s", error->message);
      g_error_free (error);
    }
  return FALSE;
}

static void
retry_connection (GskPersistentConnection *connection,
                  GskSocketAddress        *address)
{
  GError *error = NULL;
  GskStream *transport = gsk_stream_new_connecting (address, &error);
  if (transport == NULL)
    {
      gsk_io_set_gerror (GSK_IO (connection),
                         GSK_IO_ERROR_CONNECT,
                         error);
      setup_timeout (connection);
      return;
    }
  connection->transport = transport;
  if (GSK_STREAM_FD (transport)->is_resolving_name
   || gsk_io_get_is_connecting (transport))
    {
      connection->state = GSK_PERSISTENT_CONNECTION_CONNECTING;
      connection->transport_on_connect_signal_handler
        = g_signal_connect (transport,
                            "on-connect",
                            G_CALLBACK (handle_transport_connected),
                            connection);
    }
  else
    {
      connection->state = GSK_PERSISTENT_CONNECTION_CONNECTED;
      g_signal_emit (connection, handle_connected_signal_id, 0);
    }
  if (gsk_io_is_polling_for_read (connection))
    gsk_io_trap_readable (transport,
                          handle_transport_readable,
                          handle_transport_read_shutdown,
                          connection,
                          NULL);
  if (gsk_io_is_polling_for_write (connection))
    gsk_io_trap_writable (transport,
                          handle_transport_writable,
                          handle_transport_write_shutdown,
                          connection,
                          NULL);
  connection->transport_on_error_signal_handler
    = g_signal_connect (transport,
                        "on-error",
                        G_CALLBACK (handle_transport_error),
                        connection);
}

static void gsk_persistent_connection_set_poll_read (GskIO    *io,
                                                     gboolean  should_poll)
{
  GskPersistentConnection *connection = GSK_PERSISTENT_CONNECTION (io);
  if (connection->transport)
    {
      if (should_poll)
        gsk_io_trap_readable (GSK_IO (connection->transport),
                              handle_transport_readable,
                              handle_transport_read_shutdown,
                              connection,
                              NULL);
      else
        gsk_io_untrap_readable (GSK_IO (connection->transport));
    }
}

static void gsk_persistent_connection_set_poll_write (GskIO    *io,
                                                      gboolean  should_poll)
{
  GskPersistentConnection *connection = GSK_PERSISTENT_CONNECTION (io);
  if (connection->transport)
    {
      if (should_poll)
        gsk_io_trap_writable (GSK_IO (connection->transport),
                              handle_transport_writable,
                              handle_transport_write_shutdown,
                              connection,
                              NULL);
      else
        gsk_io_untrap_writable (GSK_IO (connection->transport));
    }
}

static gboolean
handle_retry_timeout_expired (gpointer data)
{
  GskPersistentConnection *connection = GSK_PERSISTENT_CONNECTION (data);
  connection->retry_timeout_source = NULL;
  if (connection->address != NULL)
    retry_connection (connection, connection->address);
  else
    g_warning ("no address???");
  return FALSE;
}

GskStream *
gsk_persistent_connection_new (GskSocketAddress *address,
                               guint             retry_timeout_ms)
{
  GskPersistentConnection *connection = g_object_new (GSK_TYPE_PERSISTENT_CONNECTION, NULL);
  connection->address = g_object_ref (address);
  connection->retry_timeout_ms = retry_timeout_ms;
  retry_connection (connection, address);
  return GSK_STREAM (connection);
}

GskStream *
gsk_persistent_connection_new_lookup (const char *host,
                                      guint       port,
                                      guint       retry_timeout_ms)
{
  GskSocketAddress *symbolic = gsk_socket_address_symbolic_ipv4_new (host, port);
  GskStream *pc = gsk_persistent_connection_new (symbolic, retry_timeout_ms);
  g_object_unref (symbolic);
  return pc;
}

void gsk_persistent_connection_restart (GskPersistentConnection *connection,
                                        guint                    retry_wait_ms)
{
  if (connection->transport != NULL)
    {
      shutdown_transport (connection);
      g_signal_emit (connection, handle_disconnected_signal_id, 0);
    }
  if (connection->retry_timeout_source != NULL)
    {
      gsk_source_remove (connection->retry_timeout_source);
      connection->retry_timeout_source = NULL;
    }
  connection->retry_timeout_source
    = gsk_main_loop_add_timer (gsk_main_loop_default (),
                               handle_retry_timeout_expired,
                               connection,
                               NULL,
                               retry_wait_ms,
                               -1);
  connection->state = GSK_PERSISTENT_CONNECTION_WAITING;
}


syntax highlighted by Code2HTML, v. 0.9.1