/* Jungle Monkey
 * Copyright (C) 1999-2001  The Regents of the University of Michigan
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

#include <stdlib.h>

#include "mtp_server.h"
#include "mtp_debug.h"

#include "util/util.h"
#include "util/elf.h"


#define MAX_LINE_LEN	64000			/* Max length of line to read */
#define MAX_WRITE_LEN	1024			/* Longest write at one time */

/* **************************************** */


#define IS_RENDEZVOUS  	(server->type & MTP_SERVER_TYPE_RENDEZVOUS)
#define IS_MIRROR      	(server->type & MTP_SERVER_TYPE_MIRROR)


/* **************************************** */

static void VERBOSE (MtpServer* server, const gchar *format, ...);

static void mtp_server_func(GServer* gserver, GServerStatus status, 
			    GConn* conn, gpointer user_data);
static gboolean mtp_gconn_func (GConn* conn, GConnStatus status, gchar* buffer, 
				gint length, gpointer user_data);

static struct timeval 	timeofday;

static gboolean garbage_collect_cb (gpointer user_data);
static gboolean	garbage_collect_hrfunc (gpointer key, gpointer value, gpointer user_data);




/* **************************************** */

static void
VERBOSE (MtpServer* server, const gchar *format, ...)
{
  if (server->verbose) 
    {
      va_list args;
      va_start (args, format);
      vfprintf(stderr, format, args);
      va_end (args);
    }
}


/* **************************************** */


/* ************************************************************ */

MtpServer* 
mtp_server_new (gchar* hostname, GInetAddr* interface,
		gboolean force_port, MtpServerType type, 
		gboolean global_rendezvous)
{
  MtpServer* server = NULL;

  g_return_val_if_fail (type, NULL);
  MTPP (1, "mtp_server_new\n");

  server = g_new0 (MtpServer, 1);
  server->type = type;
  server->is_global = global_rendezvous;

  if (interface && gnet_inetaddr_get_port (interface) == 0)
    gnet_inetaddr_set_port (interface, MTP_SERVER_PORT);

  server->server = gnet_server_new (interface, force_port, 
				    mtp_server_func, server);
  if (!server->server)
    goto error;

  if (hostname)
    server->hostname = g_strdup (hostname);
  else
    server->hostname = 
      gnet_inetaddr_get_canonical_name (server->server->iface);

  server->name_to_troop  = g_hash_table_new (g_str_hash, g_str_equal);
  server->gc_timer = g_timeout_add (MTP_EXPIRE_TIME, garbage_collect_cb, server);

  server->name_to_local_mirror = g_hash_table_new (g_str_hash, g_str_equal);
  server->name_to_rendezvous  = g_hash_table_new (g_str_hash, g_str_equal);

  return server;

 error:
  mtp_server_delete (server);
  return NULL;
}


void
mtp_server_delete (MtpServer* server)
{
  GSList* i;

  MTPP (1, "mtp_server_delete\n");

  if (!server)
    return;
  
  g_free (server->hostname);

  for (i = server->mtp_conns; i != NULL; i = i->next)
    mtp_conn_delete ((MtpConn*) i->data);
  g_slist_free (server->mtp_conns);

  if (server->name_to_troop)
    {
      g_hash_table_foreach (server->name_to_troop, 
			    mtp_troop_delete_hfunc, NULL);
      g_hash_table_destroy (server->name_to_troop);
    }

  if (server->gc_timer)
    g_source_remove (server->gc_timer);

  if (server->name_to_local_mirror)
    {
      g_hash_table_foreach (server->name_to_local_mirror, 
			    mtp_local_mirror_delete_hfunc, NULL);
      g_hash_table_destroy (server->name_to_local_mirror);
    }
      
  if (server->name_to_rendezvous)
    {
      g_hash_table_foreach (server->name_to_rendezvous, 
			    mtp_rendezvous_delete_hfunc, NULL);
      g_hash_table_destroy (server->name_to_rendezvous);
    }

  if (server->server)        gnet_server_delete (server->server);

  memset (server, 0, sizeof(*server));
  g_free (server);
}


void
mtp_server_print (MtpServer* server, FILE* file)
{
  GSList* i;

  g_return_if_fail (server);
  g_return_if_fail (file);

  fprintf (file, "MTP SERVER CONNECTIONS\n");
  for (i = server->mtp_conns; i != NULL; i = i->next)
    {
      MtpConn* conn = (MtpConn*) i->data;

      fprintf (file, "\t%s:%d (downloading %s, %d)\n",
	       conn->conn->hostname, conn->conn->port,
	       conn->name ? conn->name : "<none>",
	       conn->offset);
    }

  fprintf (file, "RENDEZVOUS SERVER TROOPS\n");
  g_hash_table_foreach (server->name_to_troop, mtp_troop_print_hfunc, file);

  fprintf (file, "MIRROR SERVER FILES\n");
  g_hash_table_foreach (server->name_to_local_mirror, mtp_local_mirror_print_hfunc, 
			file);
}


void
mtp_server_set_type (MtpServer* server, MtpServerType type)
{
  g_return_if_fail (server);

  server->type = type;
}



/* **************************************** */


static void send_ok (MtpConn* mtp_conn, gchar* name);
static void send_error (MtpConn* mtp_conn, gchar* name, gchar* type);
static void send_file (MtpConn* mtp_conn, MtpLocalMirror* lm);
static void send_mirrors (MtpConn* mtp_conn, MtpTroop* troop);
static void send_info (MtpConn* mtp_conn, gchar* name, MtpTroop* troop, MtpLocalMirror* lm);



static void
mtp_server_func (GServer* gserver, GServerStatus status, GConn* conn, gpointer user_data)
{
  MtpServer* server = (MtpServer*) user_data;
  MtpConn* mtp_conn;

  g_return_if_fail (server);
  g_assert (status != GNET_SERVER_STATUS_ERROR);

  MTPP (1, "mtp_server_func %d\n", status); 
  VERBOSE (server, "CONNECT: %s:%d\n", conn->hostname, conn->port);

  mtp_conn = mtp_conn_new (conn, server);
  server->mtp_conns = g_slist_prepend (server->mtp_conns, mtp_conn);

  conn->func = mtp_gconn_func;
  conn->user_data = mtp_conn;


  gnet_conn_readline (conn, NULL, MAX_LINE_LEN, 0);
  gnet_conn_timeout (conn, MTP_SERVER_TIMEOUT);
}



static gboolean
mtp_gconn_func (GConn* conn, GConnStatus status, gchar* buffer, gint length, gpointer user_data)
{
  MtpConn* mtp_conn = (MtpConn*) user_data;
  MtpServer* server = mtp_conn->server;

  g_return_val_if_fail (mtp_conn, FALSE);
  g_return_val_if_fail (server, FALSE);


  switch (status)
    {
    case GNET_CONN_STATUS_READ:
      {
	GList* as = NULL;
	ElfNode* a = NULL;
	gchar* name = NULL;

	/* Reset timer */
	gnet_conn_timeout (conn, MTP_SERVER_TIMEOUT);

	/* Read command */
	as = elf_read_list (buffer, length);
	if (!as)
	  {
	    MTPP (1, "Could not parse buffer: %s\n", buffer);
	    goto send_error;
	  }

	a = (ElfNode*) as->data;
	g_return_val_if_fail (a && a->name, FALSE);

	name = elf_get_attribute (a, "name");

	if (name && !strcmp (a->name, "GET_MIRRORS"))
	  {
	    MtpTroop* troop;

	    VERBOSE (server, "GET_MIRRORS %s (%s:%d)\n", 
		     name, conn->hostname, conn->port);

	    troop = 
	      (MtpTroop*) g_hash_table_lookup (server->name_to_troop, name);
	    /* Send mirrors if we have this troop */
	    if (troop)
	      {
		g_return_val_if_fail (troop->mirrors->len, FALSE);

		++troop->num_get_mirrors;
		send_mirrors (mtp_conn, troop);
	      }
	    else
	      send_error (mtp_conn, name, "file not found");

	  }
	else if (IS_RENDEZVOUS && name && !strcmp (a->name, "MIRROR"))
	  {
	    gchar* port_str;
	    gint port;
	    MtpTroop* troop;

	    /* Get port of host */
	    port_str = elf_get_attribute (a, "port");
	    if (!port_str)
	      goto send_error;
	    port = atoi (port_str);
	    if (port <= 0)
	      goto send_error;

	    VERBOSE (server, "MIRROR %s, %d (%s:%d)\n", 
		     name, port, conn->hostname, conn->port);

	    troop = (MtpTroop*) g_hash_table_lookup (server->name_to_troop, name);
	    /* If there is no such troop, create a troop only if we're
               acting as a "global" rendezvous. */
	    if (!troop && server->is_global)
	      {
		troop = mtp_troop_new (name);
		g_hash_table_insert (server->name_to_troop, troop->name, troop);
	      }

	    /* Update the information about this mirror.  This will
               create a new MtpMirror if necessary. */
	    if (troop)
	      {
		mtp_troop_update (troop, conn->hostname, port, FALSE);
		send_ok (mtp_conn, name);
	      }
	    else
	      send_error (mtp_conn, name, "file not found");
	  }
	else if (IS_RENDEZVOUS && name && !strcmp (a->name, "UNMIRROR"))
	  {
	    gchar* port_str;
	    gint port;
	    MtpTroop* troop;

	    /* Get port of host */
	    port_str = elf_get_attribute (a, "port");
	    if (!port_str)
	      goto send_error;
	    port = atoi (port_str);
	    if (port <= 0)
	      goto send_error;

	    VERBOSE (server, "UNMIRROR %s, %d (%s:%d)\n", 
		     name, port, conn->hostname, conn->port);

	    /* Get troop */
	    troop = 
	      (MtpTroop*) g_hash_table_lookup (server->name_to_troop, name);
	    if (troop)
	      {
		/* Remove mirror from troop */
		mtp_troop_remove (troop, conn->hostname, port);

		/* Delete troop if no mirrors left */
		if (!troop->mirrors->len)
		  {
		    g_hash_table_remove (server->name_to_troop, name);
		    mtp_troop_delete (troop);
		  }

		send_ok (mtp_conn, name);
	      }
	    else
	      send_error (mtp_conn, name, NULL);
	  }
	else if (IS_MIRROR && name && !strcmp (a->name, "GET"))
	  {
	    MtpLocalMirror* lm;

	    VERBOSE (server, "GET %s (%s:%d)\n", 
		     name, conn->hostname, conn->port);

	    /* TODO: Add offset */

	    lm = (MtpLocalMirror*) 
	      g_hash_table_lookup (server->name_to_local_mirror, name);

	    /* Send file if we are in fact mirroring it */
	    if (lm)
	      {
		++lm->num_downloads;
		send_file (mtp_conn, lm);
	      }
	    else
	      send_error (mtp_conn, name, "file not found");

	  }
	else if (!strcmp (a->name, "GET_INFO"))
	  {
	    MtpLocalMirror* lm = NULL;
	    MtpTroop* troop = NULL;

	    VERBOSE (server, "GET_INFO %s (%s, %d)\n", 
		     name, conn->hostname, conn->port);

	    /* Get local mirror and troop (if name given) */
	    if (name && IS_MIRROR)
	      lm = (MtpLocalMirror*)
		g_hash_table_lookup (server->name_to_local_mirror, name);
	    if (name && IS_RENDEZVOUS)
	      troop = (MtpTroop*) 
		g_hash_table_lookup (server->name_to_troop, name);

	    /* Send info if no name, or there's a mirror or troop */
	    if (!name || (lm || troop))
	      send_info (mtp_conn, name, troop, lm);
	    else /* name && no mirror or troop */
	      send_error (mtp_conn, name, "file not found");
	  }
	else
	  {
	  send_error:
	    MTPP (1, "ERROR: message=\"%s\" name=\"%s\" (%s, %d)\n", 
		  (a && a->name)? a->name : "<null>",
		  name ? name : "<null>", 
		  conn->hostname, conn->port);

	    send_error (mtp_conn, NULL, NULL);
	  }

	if (as)
	  elf_delete_list(as);

	/* Don't read next line until we write a response */
	return FALSE;

	break;
      }

    case GNET_CONN_STATUS_WRITE:
      {
	MtpLocalMirror* lm;
	guint len;
	gchar* buf;
	gint rv;

	/* Delete the buffer if we didn't just send some file. */
	if (!mtp_conn->offset)
	  g_free (buffer);

	/* Otherwise, check if we're done sending the file. */
	else if (mtp_conn->offset == mtp_conn->length)
	  {
	    /* Set TOS back to normal */
	    gnet_tcp_socket_set_tos (mtp_conn->conn->socket, GNET_TOS_NONE);

	    /* Reset stuff */
	    g_free (mtp_conn->name);
	    mtp_conn->name = NULL;
	    mtp_conn->length = 0;
	    mtp_conn->offset = 0;
	  }

	/* Read the next line and break if is no file to send.  We may
	   have just finished sending the file.  */
	if (!mtp_conn->name)
	  {
	    gnet_conn_readline (conn, NULL, MAX_LINE_LEN, 0);
	    gnet_conn_timeout (conn, MTP_SERVER_TIMEOUT);

	    break;
	  }
	/* Otherwise, we have some file to send */

	/* Get the local mirror. */
	lm = (MtpLocalMirror*) 
	  g_hash_table_lookup (server->name_to_local_mirror, 
			       mtp_conn->name);

	/* If the lm is gone, or the length changed, kill the
           connection.  This should not happen. */
	if (!lm || lm->length != mtp_conn->length)
	  goto error;

	/* Calculate how much to send */
	len = MIN (MAX_WRITE_LEN, mtp_conn->length - mtp_conn->offset);
	g_return_val_if_fail (len > 0, FALSE);
	buf = (gchar*) g_malloc (len);

	/* Upcall to read some file.  This might fail. */
	rv = (lm->read_func)(lm, mtp_conn->offset, buf, len, lm->user_data);
	if (rv == len)
	  {
	    gnet_conn_timeout (conn, MTP_SERVER_TIMEOUT);
	    gnet_conn_write (conn, buf, rv, 0);

	    mtp_conn->offset += rv;
	  }
	else
	  {
	    /* If this fails, the upper level may delete the local
	       mirror (maybe the file moved).  It should not delete
	       the server or connection, so we should be ok. */
	    g_free (buf);
	    goto error;
	  }

	break;
      }

    error:
    case GNET_CONN_STATUS_CONNECT:
    case GNET_CONN_STATUS_CLOSE:
    case GNET_CONN_STATUS_TIMEOUT:
    case GNET_CONN_STATUS_ERROR:
      {
	VERBOSE (server, "DISCONNECT: %s:%d\n", 
		 conn->hostname, conn->port);

	server->mtp_conns = g_slist_remove (server->mtp_conns, mtp_conn);
	mtp_conn_delete (mtp_conn);
	break;
      }
    }

  return FALSE;
}


static void
send_ok (MtpConn* mtp_conn, gchar* name)
{
  ElfNode* jmpa;
  gchar* buffer;
  gint length;

  g_return_if_fail (mtp_conn);
  g_return_if_fail (name);

  jmpa = elf_new ("OK");
  elf_set_attribute (jmpa, "name", name);
  elf_write (jmpa, &buffer, &length);
  elf_delete (jmpa);

  gnet_conn_timeout (mtp_conn->conn, MTP_SERVER_TIMEOUT);
  gnet_conn_write (mtp_conn->conn, buffer, length, 0);
}


static void
send_error (MtpConn* mtp_conn, gchar* name, gchar* type)
{
  ElfNode* jmpa;
  gchar* buffer;
  gint length;

  g_return_if_fail (mtp_conn);

  jmpa = elf_new ("ERROR");
  if (name) elf_set_attribute (jmpa, "name", name);
  if (type) elf_set_attribute (jmpa, "type", type);
  elf_write (jmpa, &buffer, &length);
  elf_delete (jmpa);

  gnet_conn_timeout (mtp_conn->conn, MTP_SERVER_TIMEOUT);
  gnet_conn_write (mtp_conn->conn, buffer, length, 0);
}


static void
send_file (MtpConn* mtp_conn, MtpLocalMirror* lm)
{
  ElfNode* jmpa;
  gchar length_buffer[32];
  gchar* buffer;
  gint length;

  g_return_if_fail (mtp_conn);
  g_return_if_fail (!mtp_conn->name);
  g_return_if_fail (lm);

  /* Create FILE message */
  jmpa = elf_new ("FILE");
  elf_set_attribute (jmpa, "name", lm->name);
  g_snprintf (length_buffer, sizeof(length_buffer), "%d", lm->length);
  elf_set_attribute (jmpa, "length", length_buffer);
  elf_write (jmpa, &buffer, &length);
  elf_delete (jmpa);

  gnet_conn_timeout (mtp_conn->conn, MTP_SERVER_TIMEOUT);
  gnet_conn_write (mtp_conn->conn, buffer, length, 0);

  /* Set up write for the rest of the file */
  if (lm->length)
    {
      /* Set TOS on socket */
      gnet_tcp_socket_set_tos (mtp_conn->conn->socket, GNET_TOS_LOWCOST);

      /* Save state of write */
      mtp_conn->name = g_strdup (lm->name);
      mtp_conn->offset = 0;
      mtp_conn->length = lm->length;
    }
}


static void
send_mirrors (MtpConn* mtp_conn, MtpTroop* troop)
{
  guint num_mirrors, first_next_mirror, i;
  GSList* mirrors = NULL;
  gchar** array = NULL;
  GSList* m;
  gchar* str;
  ElfNode* jmpa;
  gchar* buffer;
  gint length;

  /* Choose NUM_MIRRORS mirrors round robin */
  num_mirrors = MIN (MTP_NUM_MIRRORS, troop->mirrors->len);
  g_return_if_fail (num_mirrors);

  /* Save where we started */
  first_next_mirror = troop->next_mirror;
  g_return_if_fail (first_next_mirror < troop->mirrors->len);

  /* Get the mirrors */
  for (i = 0; i < num_mirrors; ++i)
    {
      MtpMirror* mirror = 
	(MtpMirror*) g_ptr_array_index (troop->mirrors, troop->next_mirror);

      mirrors = g_slist_prepend (mirrors, mirror);

      ++troop->next_mirror;
      if (troop->next_mirror == troop->mirrors->len)
	troop->next_mirror = 0;
    }
  mirrors = g_slist_reverse (mirrors);

  /* If we ended up where we started, then advance troop->next_mirror.
     This keeps us from sending the same mirrors in the same order
     every time.  If we didn't do this, we might strain the first
     mirror in the list.  This doesn't matter if there is only one
     mirror. 

     Note we still have problems if the number of mirrors is divisible
     by MTP_NUM_MIRRORS but is not MTP_NUM_MIRRORS.  When this
     happens, we end up sending the same sets of mirrors in the same
     order.  Ideally the number of mirrors is prime and this never
     happens.  :-)

     Really we should choose randomly, but we will eventually
     integrate IDMaps and this shouldn't matter..

  */
  if (first_next_mirror == troop->next_mirror && troop->mirrors->len > 1)
    {
      ++troop->next_mirror;
      if (troop->next_mirror == troop->mirrors->len)
	troop->next_mirror = 0;
    }

  /* Serialize into form "hostname0,port0;hostname1,port1;..." */
  array = g_new0 (gchar*, num_mirrors + 1);
  for (m = mirrors, i = 0; i < num_mirrors; ++i, m = m->next)
    {
      MtpMirror* mirror = (MtpMirror*) m->data;

      ++mirror->num_redirects;
      array[i] = g_strdup_printf ("%s,%d", mirror->hostname, mirror->port);
    }
  g_slist_free (mirrors);

  str = g_strjoinv (";", array);
    
  /* Send */
  jmpa = elf_new ("MIRRORS");
  elf_set_attribute (jmpa, "mirrors", str);
  elf_write (jmpa, &buffer, &length);
  elf_delete (jmpa);

  gnet_conn_timeout (mtp_conn->conn, MTP_SERVER_TIMEOUT);
  gnet_conn_write (mtp_conn->conn, buffer, length, 0);

  g_free (str);
  for (i = 0; i < num_mirrors; ++i)
    g_free (array[i]);
  g_free (array);
}



static void
send_info (MtpConn* mtp_conn, gchar* name, MtpTroop* troop, MtpLocalMirror* lm)
{
  ElfNode* jmpa;
  gchar* buffer;
  gint length;

  g_return_if_fail (mtp_conn);

  jmpa = elf_new ("INFO");
  if (name)
    elf_set_attribute (jmpa, "name", name);
  if (lm && lm->length)
    {
      gchar buffer[32];
      g_snprintf (buffer, sizeof(buffer), "%d", lm->length);
      elf_set_attribute (jmpa, "length", buffer);
    }
  elf_write (jmpa, &buffer, &length);
  elf_delete (jmpa);

  gnet_conn_timeout (mtp_conn->conn, MTP_SERVER_TIMEOUT);
  gnet_conn_write (mtp_conn->conn, buffer, length, 0);
}



/* **************************************** */

/**

   We occasionally remove all mirrors from which we haven't received
   an update in a set amount of time.  This happens when a mirror
   crashes.  In the common case, a mirror should send an UNMIRROR and
   garbage collection shouldn't be necessary.

 */

static gboolean
garbage_collect_cb (gpointer user_data)
{
  MtpServer* server = (MtpServer*) user_data;

  g_return_val_if_fail (server, FALSE);
  MTPP (1, "mtp_garbage_collect_cb\n");

  gettimeofday (&timeofday, NULL);
  g_hash_table_foreach_remove (server->name_to_troop, garbage_collect_hrfunc, server);

  return TRUE;
}


static gboolean
garbage_collect_hrfunc (gpointer key, gpointer value, gpointer user_data)
{
  MtpTroop* troop;
  MtpServer* server;
  guint i;

  troop = (MtpTroop*) value;
  g_return_val_if_fail (troop, FALSE);

  server = (MtpServer*) user_data;

  /* Check if any host expired */
  for (i = 0; i < troop->mirrors->len; ++i)
    {
      struct timeval diff;
      MtpMirror* mirror = (MtpMirror*) g_ptr_array_index (troop->mirrors, i);

      /* Ignore local mirrors */
      if (mirror->is_local)
	continue;

      /* If older than EXPIRE_TIME, remove. */
      timersub(&timeofday, &mirror->last_update, &diff);
      if (diff.tv_sec > (MTP_EXPIRE_TIME / 1000))
	{
	  VERBOSE (server, "UNMIRROR (timeout) %s, %d (%s)\n", 
		   troop->name, mirror->port, mirror->hostname);

	  g_ptr_array_remove_index_fast (troop->mirrors, i);
	  mtp_mirror_delete (mirror);
	}
    }

  /* If no mirrors left, delete the troop */
  if (!troop->mirrors->len)
    {
      mtp_troop_delete (troop);
      return TRUE;
    }
  /* Otherwise, make sure next_mirror is sensible */
  else if (troop->next_mirror >= troop->mirrors->len)
    {
      troop->next_mirror = 0;
    }

  return FALSE;
}



syntax highlighted by Code2HTML, v. 0.9.1