#include <errno.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include "gskpacketqueuefd.h"
#include "gskmacros.h"
#include "gskutils.h"
#include "gsktypes.h"
#include "gskfork.h"
#include "gskerrno.h"
#include "config.h"

enum
{
  PROP_0,
  PROP_FILE_DESCRIPTOR
};

#define USE_GLIB_MAIN_LOOP	GSK_PACKET_QUEUE_FD_USE_GLIB_MAIN_LOOP


static GObjectClass *parent_class = NULL;

/* This number is due to the header format of UDP.
   The header (see RFC 768) only allows a 16-bit length.
   As of Dec 2003, UDP is the least-updated protocol,
   RFC 768 is from 1980, so hardcoding this constant is probably ok.

   On the other hand, eventually fat pipes may cause demand for
   yet bigger packets. */
#define MAX_UDP_PACKET_SIZE   ((1 << 16) - 1)

/* --- queue methods --- */
static gboolean
gsk_packet_queue_fd_bind (GskPacketQueue    *queue,
			  GskSocketAddress  *addr,
		          GError           **error)
{
  GskPacketQueueFd *queue_fd = GSK_PACKET_QUEUE_FD (queue);
  socklen_t native_len = gsk_socket_address_sizeof_native (addr);
  gpointer native = alloca (native_len);

  if (!gsk_socket_address_to_native (addr, native, error))
    return FALSE;
  if (bind (queue_fd->fd, native, native_len) < 0)
    {
      int e = errno;
      g_set_error (error, GSK_G_ERROR_DOMAIN,
		   gsk_error_code_from_errno (e),
		   _("PacketQueueFd: bind failed: %s"),
		   g_strerror (e));
      return FALSE;
    }

  if (queue_fd->bound_address != NULL)
    g_object_unref (queue_fd->bound_address);
  queue_fd->bound_address = g_object_ref (addr);

  gsk_packet_queue_mark_allow_no_address (queue);
  return TRUE;
}

static GskPacket *
gsk_packet_queue_fd_read (GskPacketQueue    *queue,
			  gboolean           save_address,
		          GError           **error)
{
  GskPacketQueueFd *queue_fd = GSK_PACKET_QUEUE_FD (queue);
  char *tmp = alloca (MAX_UDP_PACKET_SIZE);
  int fd = queue_fd->fd;
  struct sockaddr addr;
  socklen_t addrlen = sizeof (addr);
  int rv;
  GskPacket *packet;
  gpointer data;
  if (save_address)
    rv = recvfrom (fd, tmp, MAX_UDP_PACKET_SIZE, 0, &addr, &addrlen);
  else
    /* according to bsd man page, we should use 'recvfrom' instead of
       'recv', even in this case. 

       XXX: provide more accurate citation.  */
    rv = recvfrom (fd, tmp, MAX_UDP_PACKET_SIZE, 0, NULL, NULL);
  if (rv < 0)
    {
      int e = errno;
      if (!gsk_errno_is_ignorable (e))
	g_set_error (error, GSK_G_ERROR_DOMAIN,
		     gsk_error_code_from_errno (e),
		     _("packet-queue-read failed: %s"),
		     g_strerror (e));
      return NULL;
    }
  data = g_memdup (tmp, rv);
  packet = gsk_packet_new (data, rv, (GskPacketDestroyFunc) g_free, data);
  if (save_address)
    {
      packet->src_address = gsk_socket_address_from_native (&addr, addrlen);
      if (packet->src_address == NULL)
	{
	  g_set_error (error, GSK_G_ERROR_DOMAIN,
		       GSK_ERROR_FOREIGN_ADDRESS,
		       _("received packet had invalid or unknown address"));
	  gsk_packet_unref (packet);
	  return NULL;
	}
    }
  if (queue_fd->bound_address != NULL)
    packet->dst_address = g_object_ref (queue_fd->bound_address);
  return packet;
}

static gboolean
gsk_packet_queue_fd_write (GskPacketQueue    *queue,
		           GskPacket         *out,
		           GError           **error)
{
  GskPacketQueueFd *queue_fd = GSK_PACKET_QUEUE_FD (queue);
  guint native_size;
  gpointer native_addr;
  gssize rv;
  int fd = queue_fd->fd;
  if (out->dst_address != NULL)
    {
      native_size = gsk_socket_address_sizeof_native (out->dst_address);
      native_addr = alloca (native_size);
      if (!gsk_socket_address_to_native (out->dst_address, native_addr, error))
	return FALSE;
    }
  else
    {
      native_size = 0;
      native_addr = NULL;
    }
  rv = sendto (fd, out->data, out->len, 0, native_addr, native_size);
  if (rv < 0)
    {
      int e = errno;
      if (!gsk_errno_is_ignorable (e))
	g_set_error (error, GSK_G_ERROR_DOMAIN,
		     gsk_error_code_from_errno (e),
		     _("packet-queue-fd-write: %s"),
		     g_strerror (e));
      return FALSE;
    }
  if ((guint) rv < out->len)
    {
      g_set_error (error, GSK_G_ERROR_DOMAIN,
		   GSK_ERROR_UNEXPECTED_PARTIAL_WRITE,
		   _("sendto did not get all the bytes of the packet sent"));
      return FALSE;
    }
  return TRUE;
}

/* --- io methods --- */
/* The following functions are defined twice:
      add_poll()
      remove_poll()
      set_poll_read()
      set_poll_write()
   once for glib and once for gsk.
 */
     
#if USE_GLIB_MAIN_LOOP
typedef struct _GskPacketQueueFdSource GskPacketQueueFdSource;
struct _GskPacketQueueFdSource
{
  GSource base;
  GskPacketQueueFd *packet_queue_fd;
};

static gboolean
gsk_packet_queue_fd_source_prepare (GSource    *source,
			            gint       *timeout)
{
  return FALSE;
}

static gboolean
gsk_packet_queue_fd_source_check    (GSource    *source)
{
  GskPacketQueueFdSource *fd_source = (GskPacketQueueFdSource *) source;
  return fd_source->packet_queue_fd->poll_fd.revents != 0;
}

static gboolean
gsk_packet_queue_fd_source_dispatch (GSource    *source,
			             GSourceFunc callback,
			             gpointer    user_data)
{
  GskPacketQueueFdSource *fd_source = (GskPacketQueueFdSource *) source;
  GskPacketQueueFd *packet_queue_fd = fd_source->packet_queue_fd;
  guint events = fd_source->packet_queue_fd->poll_fd.revents;
  if ((events & (G_IO_IN|G_IO_HUP)) != 0)
    gsk_io_notify_ready_to_read (GSK_IO (packet_queue_fd));
  if ((events & G_IO_OUT) == G_IO_OUT)
    gsk_io_notify_ready_to_write (GSK_IO (packet_queue_fd));
  return TRUE;
}

static GSourceFuncs gsk_packet_queue_fd_source_funcs =
{
  gsk_packet_queue_fd_source_prepare,
  gsk_packet_queue_fd_source_check,
  gsk_packet_queue_fd_source_dispatch,
  NULL,					/* finalize */
  NULL,					/* closure-callback (reserved) */
  NULL					/* closure-marshal (reserved) */
};

static void
add_poll (GskPacketQueueFd *packet_queue_fd)
{
  GskPacketQueueFdSource *fd_source;
  packet_queue_fd->poll_fd.fd = packet_queue_fd->fd;
  packet_queue_fd->source = g_source_new (&gsk_packet_queue_fd_source_funcs,
				          sizeof (GskPacketQueueFdSource));
  fd_source = (GskPacketQueueFdSource *) packet_queue_fd->source;
  fd_source->packet_queue_fd = packet_queue_fd;
  g_source_add_poll (packet_queue_fd->source, &packet_queue_fd->poll_fd);
  g_source_attach (packet_queue_fd->source, g_main_context_default ());
  packet_queue_fd->poll_fd.events = 0;
}

static void
remove_poll (GskPacketQueueFd *packet_queue_fd)
{
  if (packet_queue_fd->source != NULL)
    {
      g_source_destroy (packet_queue_fd->source);
      g_source_unref (packet_queue_fd->source);
      packet_queue_fd->source = NULL;
    }
}

static void
gsk_packet_queue_fd_set_poll_read   (GskIO         *io,
			             gboolean       do_poll)
{
  GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (io);
  if (do_poll)
    packet_queue_fd->poll_fd.events |= G_IO_IN;
  else
    packet_queue_fd->poll_fd.events &= ~G_IO_IN;
}

static void
gsk_packet_queue_fd_set_poll_write  (GskIO         *io,
			             gboolean       do_poll)
{
  GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (io);
  if (do_poll)
    packet_queue_fd->poll_fd.events |= G_IO_OUT;
  else
    packet_queue_fd->poll_fd.events &= ~G_IO_OUT;
}
#else	/* !USE_GLIB_MAIN_LOOP */
static gboolean
handle_io_event (int fd, GIOCondition events, gpointer data)
{
  GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (data);
  if ((events & (G_IO_IN|G_IO_HUP)) != 0)
    gsk_io_notify_ready_to_read (GSK_IO (packet_queue_fd));
  if ((events & G_IO_OUT) == G_IO_OUT)
    gsk_io_notify_ready_to_write (GSK_IO (packet_queue_fd));
  return TRUE;
}

static void
add_poll (GskPacketQueueFd *packet_queue_fd)
{
  packet_queue_fd->source = gsk_main_loop_add_io (gsk_main_loop_default (),
						  packet_queue_fd->fd,
						  0,	/* no initial events */
						  handle_io_event,
						  packet_queue_fd,
						  NULL);
}
static void
remove_poll (GskPacketQueueFd *packet_queue_fd)
{
  if (packet_queue_fd->source != NULL)
    {
      gsk_source_remove (packet_queue_fd->source);
      packet_queue_fd->source = NULL;
    }
}
static void
gsk_packet_queue_fd_set_poll_read   (GskIO         *io,
			             gboolean       do_poll)
{
  GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (io);
  if (do_poll)
    gsk_source_add_io_events (packet_queue_fd->source, G_IO_IN);
  else
    gsk_source_remove_io_events (packet_queue_fd->source, G_IO_IN);
}
static void
gsk_packet_queue_fd_set_poll_write  (GskIO         *io,
			             gboolean       do_poll)
{
  GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (io);
  if (do_poll)
    gsk_source_add_io_events (packet_queue_fd->source, G_IO_OUT);
  else
    gsk_source_remove_io_events (packet_queue_fd->source, G_IO_OUT);
}
#endif

static gboolean
gsk_packet_queue_fd_shutdown_read   (GskIO         *io,
				     GError       **error)
{
  GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (io);
  if (shutdown (packet_queue_fd->fd, SHUT_RD) < 0)
    {
      int e = errno;
      g_set_error (error, GSK_G_ERROR_DOMAIN,
		   gsk_error_code_from_errno (e),
		   "error shutting down fd %d for reading: %s",
		   packet_queue_fd->fd,
		   g_strerror (e));
      return FALSE;
    }
  else
    {
      return TRUE;
    }
}

static gboolean
gsk_packet_queue_fd_shutdown_write  (GskIO         *io,
				     GError       **error)
{
  GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (io);
  if (shutdown (packet_queue_fd->fd, SHUT_WR) < 0)
    {
      int e = errno;
      g_set_error (error, GSK_G_ERROR_DOMAIN,
		   gsk_error_code_from_errno (e),
		   "error shutting down fd %d for writing: %s",
		   packet_queue_fd->fd,
		   g_strerror (e));
      return FALSE;
    }
  else
    {
      return TRUE;
    }
}

static void
gsk_packet_queue_fd_close (GskIO         *io)
{
  GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (io);
  remove_poll (packet_queue_fd);
  if (packet_queue_fd->fd >= 0)
    {
      close (packet_queue_fd->fd);
      gsk_fork_remove_cleanup_fd (packet_queue_fd->fd);
      packet_queue_fd->fd = -1;
    }
}

/* --- arguments --- */
static void
gsk_packet_queue_fd_get_property (GObject        *object,
			          guint           property_id,
			          GValue         *value,
			          GParamSpec     *pspec)
{
  switch (property_id)
    {
    case PROP_FILE_DESCRIPTOR:
      g_value_set_int (value, GSK_PACKET_QUEUE_FD (object)->fd);
      break;
    }
}

static void
gsk_packet_queue_fd_set_property (GObject        *object,
			          guint           property_id,
			          const GValue   *value,
			          GParamSpec     *pspec)
{
  switch (property_id)
    {
    case PROP_FILE_DESCRIPTOR:
      {
	int fd = g_value_get_int (value);
	GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (object);
	if (packet_queue_fd->fd >= 0)
	  gsk_fork_remove_cleanup_fd (fd);
	if (fd >= 0)
	  gsk_fork_add_cleanup_fd (fd);
	packet_queue_fd->fd = fd;
	break;
      }
    }
}

static gboolean
gsk_packet_queue_fd_open (GskIO     *io,
		          GError   **error)
{
  GskPacketQueue *queue = GSK_PACKET_QUEUE (io);
  GskPacketQueueFd *packet_queue_fd = GSK_PACKET_QUEUE_FD (io);
  if (packet_queue_fd->fd < 0)
    {
      g_set_error (error, GSK_G_ERROR_DOMAIN, GSK_ERROR_OPEN_FAILED,
		   _("must specify valid file-descriptor"));
      return FALSE;
    }

  g_return_val_if_fail (packet_queue_fd->source == NULL, FALSE);
  add_poll (packet_queue_fd);
  GSK_HOOK_SET_FLAG (GSK_IO_WRITE_HOOK (queue), IS_AVAILABLE);
  GSK_HOOK_SET_FLAG (GSK_IO_READ_HOOK (queue), IS_AVAILABLE);
  return TRUE;
}

/* --- functions --- */
static void
gsk_packet_queue_fd_init (GskPacketQueueFd *packet_queue_fd)
{
  GskPacketQueue *queue = GSK_PACKET_QUEUE (packet_queue_fd);
  packet_queue_fd->fd = -1;
  gsk_packet_queue_mark_misses_packets (queue);
  gsk_packet_queue_mark_allow_address (queue);
}


static void
gsk_packet_queue_fd_class_init (GskPacketQueueFdClass *class)
{
  GskPacketQueueClass *queue_class = GSK_PACKET_QUEUE_CLASS (class);
  GskIOClass *io_class = GSK_IO_CLASS (class);
  GObjectClass *object_class = G_OBJECT_CLASS (class);
  GParamSpec *pspec;

  parent_class = g_type_class_peek_parent (class);

  queue_class->bind = gsk_packet_queue_fd_bind;
  queue_class->read = gsk_packet_queue_fd_read;
  queue_class->write = gsk_packet_queue_fd_write;
  io_class->set_poll_read = gsk_packet_queue_fd_set_poll_read;
  io_class->set_poll_write = gsk_packet_queue_fd_set_poll_write;
  io_class->shutdown_read = gsk_packet_queue_fd_shutdown_read;
  io_class->shutdown_write = gsk_packet_queue_fd_shutdown_write;
  io_class->open = gsk_packet_queue_fd_open;
  io_class->close = gsk_packet_queue_fd_close;
  object_class->get_property = gsk_packet_queue_fd_get_property;
  object_class->set_property = gsk_packet_queue_fd_set_property;
  pspec = gsk_param_spec_fd ("file-descriptor",
			     _("File Descriptor"),
			     _("for reading and/or writing"),
                             G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE);
  g_object_class_install_property (object_class, PROP_FILE_DESCRIPTOR, pspec);
}

GType gsk_packet_queue_fd_get_type()
{
  static GType packet_queue_fd_type = 0;
  if (!packet_queue_fd_type)
    {
      static const GTypeInfo packet_queue_fd_info =
      {
	sizeof(GskPacketQueueFdClass),
	(GBaseInitFunc) NULL,
	(GBaseFinalizeFunc) NULL,
	(GClassInitFunc) gsk_packet_queue_fd_class_init,
	NULL,		/* class_finalize */
	NULL,		/* class_data */
	sizeof (GskPacketQueueFd),
	0,		/* n_preallocs */
	(GInstanceInitFunc) gsk_packet_queue_fd_init,
	NULL		/* value_table */
      };
      packet_queue_fd_type = g_type_register_static (GSK_TYPE_PACKET_QUEUE,
                                                     "GskPacketQueueFd",
						     &packet_queue_fd_info,
						     0);
    }
  return packet_queue_fd_type;
}

/* --- public constructors --- */
/**
 * gsk_packet_queue_fd_new:
 * @fd: the datagram socket file-descriptor.
 *
 * Create a new #GskPacketQueue from an already opened
 * file-descriptor.
 *
 * returns: the new packet-queue.
 */
GskPacketQueue *
gsk_packet_queue_fd_new           (int  fd)
{
  return g_object_new (GSK_TYPE_PACKET_QUEUE_FD, "file-descriptor", fd, NULL);
}

/**
 * gsk_packet_queue_fd_new_by_family:
 * @addr_family: the system-specific address family.
 * @error: optional pointer to an error to set if things go wrong.
 *
 * Create a new Packet Queue using a newly opened datagram
 * socket of a given address family.  The address family
 * is the sequence of AF_ defines in the header &lt;sys/socket.h&gt;
 * on most unices.
 *
 * The address family of a #GskSocketAddress may be found
 * using gsk_socket_address_protocol_family().
 *
 * returns: the new packet-queue, or NULL if there is a problem creating the socket.
 */
GskPacketQueue *
gsk_packet_queue_fd_new_by_family (int  addr_family,
				   GError **error)
{
  int fd;
retry:
  fd = socket (addr_family, SOCK_DGRAM, 0);
  if (fd < 0)
    {
      if (gsk_errno_is_ignorable (errno))
        goto retry;
      gsk_errno_fd_creation_failed ();
      g_set_error (error, GSK_G_ERROR_DOMAIN,
		   GSK_ERROR_OPEN_FAILED,
		   _("error creating socket: %s"),
		   g_strerror (errno));
      return NULL;
    }
  gsk_fd_set_close_on_exec (fd, TRUE);

  return gsk_packet_queue_fd_new (fd);
}

/**
 * gsk_packet_queue_fd_new_bound:
 * @address: the address to bind to.
 * @error: optional pointer to an error to set if things go wrong.
 * 
 * Create a new Packet Queue using a newly opened datagram
 * socket which is bound to a given address.
 *
 * Note that socket address space for TCP and UDP is
 * separate, so it's allowed (and sometimes encouraged)
 * to bind to the same port for both a packet queue,
 * and a stream-listener.
 *
 * returns: the new packet-queue, or NULL if there is a problem creating the socket or binding.
 */
GskPacketQueue *
gsk_packet_queue_fd_new_bound     (GskSocketAddress *address,
				   GError          **error)
{
  int family = gsk_socket_address_protocol_family (address);
  GskPacketQueue *queue = gsk_packet_queue_fd_new_by_family (family, error);
  if (queue == NULL)
    return NULL;
  if (! gsk_packet_queue_bind (queue, address, error))
    {
      g_object_unref (queue);
      return NULL;
    }
  return queue;
}

/**
 * gsk_packet_queue_fd_set_broadcast:
 * @packet_queue_fd: the packet-queue to affect.
 * @allow_broadcast: whether to allow (TRUE) or disallow broadcast sends
 * and receives.  The default for a new datagram socket is to
 * disallow broadcast packets.
 * @error: optional address of an error to set if things go wrong.
 *
 * Changes the operating-system-level flag of whether
 * sends and receives of broadcast packets are allowed
 * on datagram sockets.
 *
 * returns: whether the operation was successful.
 */
gboolean gsk_packet_queue_fd_set_broadcast (GskPacketQueueFd *packet_queue_fd,
					    gboolean          allow_broadcast,
					    GError          **error)
{
  int fd = packet_queue_fd->fd;
  int broadcast = allow_broadcast;
  if (setsockopt (fd, SOL_SOCKET, SO_BROADCAST, &broadcast, sizeof (int)) < 0)
    {
      g_set_error (error, GSK_G_ERROR_DOMAIN, gsk_error_code_from_errno (errno),
		   "error setting file-descriptor %d to %s broadcast packets: %s",
		   fd, allow_broadcast ? "allow" : "disallow", g_strerror (errno));
      return FALSE;
    }
  return TRUE;
}


syntax highlighted by Code2HTML, v. 0.9.1