/* Jungle Monkey * Copyright (C) 1999-2001 The Regents of the University of Michigan * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include #include #include #include "mtp_mirror.h" #include "mtp_debug.h" #include "util/util.h" #include "util/elf.h" #define MAX_LINE_LEN 64000 /* Max length of line to read */ static gboolean mirror_gconn_func (GConn* conn, GConnStatus status, gchar* buffer, gint length, gpointer user_data); static gboolean mirror_update_cb (gpointer user_data); MtpLocalMirror* mtp_mirror (MtpServer* server, const GURL* url, MtpLocalMirrorReadFunc read_func, gpointer user_data, guint length) { MtpLocalMirror* lm; gchar* rendezvous_name; MtpRendezvous* r; gint port; MtpTroop* troop; g_return_val_if_fail (server, NULL); g_return_val_if_fail (url, NULL); g_return_val_if_fail (url->resource, NULL); g_return_val_if_fail (read_func || (!read_func && !length), NULL); port = url->port; if (!port) port = MTP_SERVER_PORT; /* Get the local mirror. We may already be mirroring this file (ie, when we mirror by SHA). */ lm = (MtpLocalMirror*) g_hash_table_lookup (server->name_to_local_mirror, url->resource); if (!lm) { lm = g_new0 (MtpLocalMirror, 1); lm->name = g_strdup(url->resource); lm->server = server; lm->length = length; lm->read_func = read_func; lm->user_data = user_data; g_hash_table_insert (server->name_to_local_mirror, lm->name, lm); } else if (lm->length != length) g_warning ("MTP Local Mirror length mismatch for %s\n", url->resource); /* Get the rendezvous. We may already know about this rendezvous. */ rendezvous_name = g_strdup_printf ("%s:%d", url->hostname, port); r = (MtpRendezvous*) g_hash_table_lookup (server->name_to_rendezvous, rendezvous_name); if (!r) { r = g_new0 (MtpRendezvous, 1); r->rendezvous_name = rendezvous_name; r->server = server; r->conn = gnet_conn_new (url->hostname, port, mirror_gconn_func, r); g_hash_table_insert (server->name_to_rendezvous, rendezvous_name, r); } else g_free (rendezvous_name); /* If we're already mirroring this file on this rendezvous, then ignore it. */ if (g_slist_find (lm->rendezvous, r)) return NULL; /* Link the local mirror and the rendezvous */ lm->rendezvous = g_slist_prepend (lm->rendezvous, r); r->local_mirrors = g_slist_prepend (r->local_mirrors, lm); /* Add the file to our server as a regular mirror */ troop = (MtpTroop*) g_hash_table_lookup (server->name_to_troop, url->resource); if (!troop) { troop = mtp_troop_new (url->resource); g_hash_table_insert (server->name_to_troop, troop->name, troop); } mtp_troop_update (troop, server->hostname, server->server->port, TRUE); /* Connect to remove server if the host isn't the local one */ if (port != server->server->port || strcmp(url->hostname, server->hostname)) { ElfNode* jmpa; gchar port_buf[16]; gchar* buffer; gint length; /* Connect to the rendezvous (though we may already be connected). */ gnet_conn_connect (r->conn, MTP_SERVER_TIMEOUT); /* Send a MIRROR */ jmpa = elf_new ("MIRROR"); elf_set_attribute (jmpa, "name", url->resource); g_snprintf (port_buf, sizeof(port_buf), "%d", server->server->port); elf_set_attribute (jmpa, "port", port_buf); elf_write (jmpa, &buffer, &length); elf_delete (jmpa); gnet_conn_write (r->conn, buffer, length, MTP_SERVER_TIMEOUT); ++r->requests_pending; /* Schedule an update */ if (!r->update_timer) r->update_timer = g_timeout_add (MTP_UPDATE_TIME, mirror_update_cb, r); } else r->is_local = TRUE; return lm; } void mtp_unmirror (MtpLocalMirror* lm) { GSList* i; MtpTroop* troop; if (!lm) return; /* Remove mirror from our server */ troop = (MtpTroop*) g_hash_table_lookup (lm->server->name_to_troop, lm->name); g_return_if_fail (troop); /* should have this troop */ mtp_troop_remove (troop, lm->server->hostname, lm->server->server->port); /* If troop is now empty, delete it */ if (!troop->mirrors->len) { g_hash_table_remove (lm->server->name_to_troop, lm->name); mtp_troop_delete (troop); } /* Remove from each rendezvous */ for (i = lm->rendezvous; i != NULL; ) { MtpRendezvous* r = (MtpRendezvous*) i->data; i = i->next; /* If the rendezvous isn't local, send UNMIRROR */ if (!r->is_local) { ElfNode* jmpa; gchar port_buf[16]; gchar* buffer; gint length; /* Connect to rendezvous */ gnet_conn_connect (r->conn, MTP_SERVER_TIMEOUT); /* Send UNMIRROR */ jmpa = elf_new ("UNMIRROR"); elf_set_attribute (jmpa, "name", lm->name); g_snprintf (port_buf, sizeof(port_buf), "%d", r->server->server->port); elf_set_attribute (jmpa, "port", port_buf); elf_write (jmpa, &buffer, &length); elf_delete (jmpa); gnet_conn_write (r->conn, buffer, length, MTP_SERVER_TIMEOUT); ++r->requests_pending; /* Remove from list */ r->local_mirrors = g_slist_remove (r->local_mirrors, lm); lm->rendezvous = g_slist_remove (lm->rendezvous, r); /* Remove from hashtable if empty. We will delete later if necessary. */ if (!r->local_mirrors) g_hash_table_remove (r->server->name_to_rendezvous, r->rendezvous_name); } else { /* Remove from list */ r->local_mirrors = g_slist_remove (r->local_mirrors, lm); lm->rendezvous = g_slist_remove (lm->rendezvous, r); /* Delete if rendezvous has no more mirrors */ if (!r->local_mirrors) { g_hash_table_remove (r->server->name_to_rendezvous, r->rendezvous_name); mtp_rendezvous_delete (r); } } } /* Delete any server connections */ for (i = lm->server->mtp_conns; i != NULL; ) { MtpConn* conn = (MtpConn*) i->data; i = i->next; if (!strcmp(lm->name, conn->name)) { lm->server->mtp_conns = g_slist_remove (lm->server->mtp_conns, conn); mtp_conn_delete (conn); } } /* Close file */ if (lm->file) fclose (lm->file); if (lm->close_timer) g_source_remove (lm->close_timer); g_free (lm->path); /* Delete the local mirror */ g_hash_table_remove (lm->server->name_to_local_mirror, lm->name); mtp_local_mirror_delete (lm); } static gboolean mirror_gconn_func (GConn* conn, GConnStatus status, gchar* buffer, gint length, gpointer user_data) { MtpRendezvous* r = (MtpRendezvous*) user_data; g_return_val_if_fail (r, FALSE); MTPP (1, "mirror_gconn_func %d\n", status); switch (status) { case GNET_CONN_STATUS_CONNECT: { /* Reset number of failures */ r->failures = 0; break; } case GNET_CONN_STATUS_CLOSE: { gnet_conn_disconnect (conn, TRUE); r->requests_pending = 0; break; } case GNET_CONN_STATUS_READ: { /* TODO: Check response */ /* If no more requests, then close */ --r->requests_pending; if (!r->requests_pending) { MTPP (1, "disconnect\n"); gnet_conn_disconnect (conn, TRUE); return FALSE; } return TRUE; break; } case GNET_CONN_STATUS_WRITE: { /* Delete the buffer */ g_free (buffer); /* Read reponse */ if (!conn->read_id) gnet_conn_readline (conn, NULL, MAX_LINE_LEN, MTP_SERVER_TIMEOUT); break; } case GNET_CONN_STATUS_ERROR: case GNET_CONN_STATUS_TIMEOUT: { gnet_conn_disconnect (conn, TRUE); r->requests_pending = 0; /* Count the failure */ ++r->failures; break; } } /* Delete the rendezvous if there's nothing to write and it has no mirrors */ if (!r->requests_pending && !r->local_mirrors) mtp_rendezvous_delete (r); return FALSE; } /** Called on rendezvous ever once and a while. We connect to the rendezvous and send a MIRROR for every file we have. */ static gboolean mirror_update_cb (gpointer user_data) { MtpRendezvous* r = (MtpRendezvous*) user_data; g_return_val_if_fail (r, FALSE); g_return_val_if_fail (r->conn, FALSE); MTPP (1, "mirror_update_cb %s\n", r->conn->hostname); /* Send mirrors only if we have mirrors and there hasn't been too many failures */ if (r->local_mirrors || r->failures < 3) { GSList* i; guint t; /* Connect to the rendezvous */ gnet_conn_connect (r->conn, MTP_SERVER_TIMEOUT); /* Send a MIRROR for each thing we're mirroring */ for (i = r->local_mirrors; i != NULL; i = i->next) { MtpLocalMirror* lm = (MtpLocalMirror*) i->data; ElfNode* jmpa; gchar port_buf[16]; gchar* buffer; gint length; jmpa = elf_new ("MIRROR"); elf_set_attribute (jmpa, "name", lm->name); g_snprintf (port_buf, sizeof(port_buf), "%d", lm->server->server->port); elf_set_attribute (jmpa, "port", port_buf); elf_write (jmpa, &buffer, &length); ++r->requests_pending; gnet_conn_write (r->conn, buffer, length, MTP_SERVER_TIMEOUT); elf_delete (jmpa); } /* Schedule next update */ t = (MTP_UPDATE_TIME + MTP_UPDATE_TIME/8) - rand() % (MTP_UPDATE_TIME/16); r->update_timer = g_timeout_add (t, mirror_update_cb, r); } /* TODO: Do something if we can't connect? */ return FALSE; } guint mtp_mirror_get_num_downloads (MtpLocalMirror* lm) { g_return_val_if_fail (lm, 0); return lm->num_downloads; } /* ******************** */ static guint read_func (MtpLocalMirror* mirror, guint offset, gchar* buf, guint len, gpointer user_data); static gboolean close_timeout (gpointer p); MtpLocalMirror* mtp_mirror_path (MtpServer* server, const GURL* url, const gchar* path) { gint size; MtpLocalMirror* lm; g_return_val_if_fail (server, NULL); g_return_val_if_fail (url, NULL); g_return_val_if_fail (path, NULL); /* Get the file size. This also make sure the file exists */ size = file_size (path); if (size < 0) return NULL; /* Mirror it */ lm = mtp_mirror (server, url, read_func, NULL, size); g_return_val_if_fail (lm, NULL); lm->path = g_strdup (path); return lm; } /** Read the next section of a file. We also reset the autoclose timer. */ static guint read_func (MtpLocalMirror* lm, guint offset, gchar* buf, guint len, gpointer user_data) { gint rv; MTPP(4, "read_func %s %d at %d\n", lm->name, len, offset); /* Open file if necessary */ if (!lm->file) { FILE* file; file = fopen (lm->path, "r"); if (!file) { g_warning ("Could not open file %s for read: %s\n", lm->path, strerror(errno)); return -1; } lm->file = file; } /* Reset autoclose timer */ if (lm->close_timer) { g_source_remove (lm->close_timer); lm->close_timer = 0; } lm->close_timer = g_timeout_add (MTP_AUTOCLOSE_TIME, close_timeout, lm); /* Seek to proper position */ rv = fseek (lm->file, offset, SEEK_SET); if (rv != 0) { g_warning ("fseek failed: %s\n", strerror(errno)); return -1; } /* Read bytes */ rv = fread (buf, len, 1, lm->file); if (rv != 1) { g_warning ("fread failed: %s\n", strerror(errno)); return -1; } return len; } static gboolean close_timeout (gpointer p) { MtpLocalMirror* lm = (MtpLocalMirror*) p; g_return_val_if_fail (lm && lm->file, FALSE); lm->close_timer = 0; fclose (lm->file); lm->file = NULL; return FALSE; }