/* * $Id: as_upload.c,v 1.31 2006/02/19 23:07:22 hex Exp $ * * Copyright (C) 2004 Markus Kern * Copyright (C) 2004 Tom Hargreaves * * All rights reserved. */ #include "as_ares.h" /*****************************************************************************/ #define BLOCKSIZE 4096 /* Log http reply headers. */ /* #define LOG_HTTP_REPLIES */ /*****************************************************************************/ static ASPacket *compile_http_reply (ASUpload *up, ASHttpHeader *reply); static as_bool send_reply_success (ASUpload *up); static as_bool send_reply_queued (ASUpload *up, int queue_pos, int queue_length); static as_bool send_reply_metadata (ASUpload *up); static as_bool send_reply_error (ASUpload *up, as_bool our_fault); static as_bool send_reply_not_found (ASUpload *up); static void send_file (int fd, input_id input, ASUpload *up); /*****************************************************************************/ static as_bool upload_set_state (ASUpload *up, ASUploadState state, as_bool raise_callback) { up->state = state; /* raise callback if specified */ if (raise_callback && up->state_cb) return up->state_cb (up, up->state); return TRUE; } static as_bool upload_is_binary (ASUpload *up) { assert (!(up->request && up->binary_request)); assert (up->request || up->binary_request); return (up->binary_request != NULL); } /*****************************************************************************/ static ASUpload *upload_new () { ASUpload *up; if (!(up = malloc (sizeof (ASUpload)))) return NULL; up->c = NULL; up->host = INADDR_NONE; up->username = NULL; up->request = NULL; up->binary_request = NULL; up->enc_key = 0; up->share = NULL; up->file = NULL; up->start = up->stop = up->sent = 0; up->input = INVALID_INPUT; up->state = UPLOAD_NEW; up->state_cb = NULL; up->auth_cb = NULL; up->data_cb = NULL; up->throttle_cb = NULL; up->upman = NULL; up->udata = NULL; return up; } /* Create new upload from HTTP request. Takes ownership of tcp connection and * request object if successful. */ ASUpload *as_upload_create (TCPC *c, ASHttpHeader *request, ASUploadStateCb state_cb, ASUploadAuthCb auth_cb) { ASUpload *up; assert (c); assert (request); if (!(up = upload_new ())) return NULL; up->c = c; up->host = up->c->host; up->request = request; up->state_cb = state_cb; up->auth_cb = auth_cb; return up; } /* Create new upload from binary request. Takes ownership of tcp connection and * request object if successful. */ ASUpload *as_upload_create_binary (TCPC *c, ASPacket *request, ASUploadStateCb state_cb, ASUploadAuthCb auth_cb) { ASUpload *up; assert (c); assert (request); if (!(up = upload_new ())) return NULL; up->c = c; up->host = up->c->host; up->binary_request = request; up->state_cb = state_cb; up->auth_cb = auth_cb; return up; } /* Free upload object. */ void as_upload_free (ASUpload *up) { if (!up) return; input_remove (up->input); tcp_close_null (&up->c); as_http_header_free (up->request); as_packet_free (up->binary_request); as_share_free (up->share); if (up->file) fclose (up->file); free (up->username); free (up); } /* Set data callback for upload. */ void as_upload_set_data_cb (ASUpload *up, ASUploadDataCb data_cb) { up->data_cb = data_cb; } /* Set throttle callback for upload. */ void as_upload_set_throttle_cb (ASUpload *up, ASUploadThrottleCb throttle_cb) { up->throttle_cb = throttle_cb; } /*****************************************************************************/ /* Send reply back to requester. Looks up share in shares manager, raises auth * callback and sends reply. Either sends 503, 404, queued status, or * requested data. Returns FALSE if the connection has been closed after a * failure reply was sent and TRUE if data transfer was started. */ as_bool as_upload_start (ASUpload *up) { ASHash *hash = NULL; int queue_pos, queue_length; as_bool reply_with_filesize = FALSE, reply_with_phash = FALSE; if (up->state != UPLOAD_NEW) { assert (up->state == UPLOAD_NEW); return FALSE; } if (upload_is_binary (up)) { as_uint8 cmd, enc_branch = 0x01; as_uint8 *stats_string = NULL; char *user_agent = NULL; #if 0 as_packet_dump (up->binary_request); #endif cmd = as_packet_get_8 (up->binary_request); assert (cmd == 0x01); /* GET */ /* FIXME: This needs some refactoring. */ while (as_packet_remaining (up->binary_request) > 3) { as_uint16 len = as_packet_get_le16 (up->binary_request); as_uint8 type = as_packet_get_8 (up->binary_request); as_uint8 *oldptr; if (as_packet_remaining (up->binary_request) < len) { AS_ERR_3 ("Binary request from '%s' too short. Type 0x%02x, len %d.", net_ip_str (up->host), type, len); send_reply_error (up, FALSE); return FALSE; } oldptr = up->binary_request->read_ptr; switch (type) { case 0x32: /* encryption branch and key*/ enc_branch = as_packet_get_8 (up->binary_request); up->enc_key = as_packet_get_le16 (up->binary_request); break; case 0x01: /* hash */ hash = as_packet_get_hash (up->binary_request); break; case 0x02: /* username */ up->username = as_packet_get_str (up->binary_request, len); break; case 0x03: /* ips and ports */ as_packet_get_ip (up->binary_request); /* supernode ip */ as_packet_get_le16 (up->binary_request); /* supernode port */ as_packet_get_ip (up->binary_request); /* client ip */ as_packet_get_le16 (up->binary_request); /* client port */ as_packet_get_ip (up->binary_request); /* client local ip */ break; case 0x05: /* if present we are supposed to just reply with meta * data, spezifically with file size */ reply_with_filesize = (as_packet_get_8 (up->binary_request) == 0x01); break; case 0x06: /* stats string */ stats_string = as_packet_get_ustr (up->binary_request, len); break; case 0x07: /* range */ up->start = as_packet_get_le32 (up->binary_request); up->stop = as_packet_get_le32 (up->binary_request); up->stop++; /* make range exclusive end ??? */ break; case 0x08: /* additional sources */ up->binary_request->read_ptr += len; break; case 0x09: /* client name and version */ user_agent = as_packet_get_str (up->binary_request, len); break; case 0x0a: /* encrypted stats */ up->binary_request->read_ptr += len; break; case 0x0b: /* 64 bit range fields */ up->start = as_packet_get_le32 (up->binary_request); as_packet_get_le32 (up->binary_request); up->stop = as_packet_get_le32 (up->binary_request); as_packet_get_le32 (up->binary_request); up->stop++; /* make range exclusive end ??? */ break; case 0x0c: /* if present we are supposed to just reply with a list * of partial hashes */ reply_with_phash = (as_packet_get_8 (up->binary_request) == 0x01); break; case 0x0d: /* more IPs and ports? */ break; default: /* skip unknown fields */ AS_DBG_3 ("Binary request from %s contains unknown field 0x%02x with length %d", net_ip_str (up->host), type, len); } /* skip past this field, regardless of how much (if any) of it we actually read */ up->binary_request->read_ptr = oldptr + len; } if (enc_branch != 0x1) { AS_ERR_2 ("Upload request enc_branch not 0x01 but 0x%02x from %s", enc_branch, net_ip_str (up->host)); as_hash_free (hash); free (user_agent); free (stats_string); send_reply_error (up, FALSE); return FALSE; } if (!hash) { AS_ERR_1 ("No hash in request from %s", net_ip_str (up->host)); free (user_agent); free (stats_string); send_reply_error (up, FALSE); return FALSE; } #if 0 AS_DBG_2 ("Binary user agent of %s is '%s'", net_ip_str (up->host), user_agent); #endif free (user_agent); free (stats_string); /* what follows are various ips, ports and alternate sources */ } else /* http request */ { const char *range; /* Get username from request. */ if ((up->username = as_http_header_get_field (up->request, "X-My-Nick"))) { if (*up->username == '\0') up->username = NULL; else up->username = strdup (up->username); } /* Get range from request header. */ if ((range = as_http_header_get_field (up->request, "Range"))) { int i = sscanf (range, "bytes=%u-%u", &up->start, &up->stop); if (!i) i = sscanf (range, "bytes %u-%u", &up->start, &up->stop); if (i == 0) { AS_ERR_2 ("Invalid range header '%s' from %s", range, net_ip_str (up->host)); send_reply_error (up, FALSE); return FALSE; } if (i == 1) /* only start specified */ up->stop = up->share->size; else up->stop++; /* make range exclusive end */ } else { AS_DBG_1 ("No range header from %s, assuming whole file", net_ip_str (up->host)); up->start = 0; up->stop = up->share->size; } /* Get hash from request header. */ if ((strncmp (up->request->uri, "sha1:", 5) && strncmp (up->request->uri, "/hack", 5)) || /* for debugging */ !(hash = as_hash_decode (up->request->uri + 5))) { AS_WARN_2 ("Malformed uri '%s' from %s", up->request->uri, net_ip_str (up->host)); send_reply_error (up, FALSE); return FALSE; } } /* Lookup share. */ if (!(up->share = as_shareman_lookup (AS->shareman, hash))) { AS_DBG_2 ("Unknown share request '%s' from %s", as_hash_str (hash), net_ip_str (up->host)); send_reply_not_found (up); as_hash_free (hash); return FALSE; } as_hash_free (hash); /* Make copy of share object in case share list changes during upload. */ if (!(up->share = as_share_copy (up->share))) { AS_ERR ("Insufficient memory."); send_reply_error (up, TRUE); return FALSE; } #if 0 /* handle phash request by sending error since we do not support them */ if (reply_with_phash) { AS_WARN_1 ("PHash request from %s, replying with 500.", net_ip_str (up->host)); send_reply_error (up, TRUE); return FALSE; } #endif /* handle size request by sending meta data */ if (reply_with_filesize) { AS_DBG_1 ("Filesize request from %s, replying with metadata.", net_ip_str (up->host)); send_reply_metadata (up); return FALSE; } /* sanity check requested range */ if (up->stop <= up->start || up->start >= up->share->size || up->stop > up->share->size) { AS_ERR_3 ("Invalid range [%u,%u) from %s", up->start, up->stop, net_ip_str (up->host)); send_reply_error (up, FALSE); return FALSE; } AS_DBG_5 ("Upload request (%s): '%s' (%d, %d) from %s", upload_is_binary (up) ? "binary" : "http", up->share->path, up->start, up->stop, net_ip_str (up->host)); /* Ask auth callback what to do. */ queue_pos = 0; /* send data */ queue_length = 0; if (up->auth_cb) queue_pos = up->auth_cb (up, &queue_length); if (queue_pos) { /* User is queued */ send_reply_queued (up, queue_pos, queue_length); return FALSE; } /* Send data. */ assert (queue_pos == 0); up->file = fopen (up->share->path, "rb"); if (!up->file || (fseek (up->file, up->start, SEEK_SET) < 0)) { AS_ERR_1 ("Failed to open file for upload: %s", up->share->path); if (up->file) { fclose (up->file); up->file = NULL; } send_reply_error (up, TRUE); return FALSE; } /* Send 206 reply. */ if (!send_reply_success (up)) { AS_ERR_1 ("Failed to send 206 reply for upload: %s", up->share->path); if (up->file) { fclose (up->file); up->file = NULL; } tcp_close_null (&up->c); return FALSE; } if (!upload_set_state (up, UPLOAD_ACTIVE, TRUE)) return FALSE; /* Callback freed us */ /* Wait until we can write file data. */ up->input = input_add (up->c->fd, (void *)up, INPUT_WRITE, (InputCallback)send_file, 0); return TRUE; } /* Cancel data transfer and close connection. Raises state callback. */ as_bool as_upload_cancel (ASUpload *up) { if (up->state != UPLOAD_ACTIVE) return FALSE; input_remove (up->input); up->input = INVALID_INPUT; tcp_close_null (&up->c); if (up->file) { fclose (up->file); up->file = NULL; } if (!upload_set_state (up, UPLOAD_CANCELLED, TRUE)) return FALSE; /* Callback freed us */ return TRUE; } /*****************************************************************************/ /* Suspend transfer using input_suspend_all on socket. */ as_bool as_upload_suspend (ASUpload *up) { if (!up->c) return FALSE; input_suspend_all (up->c->fd); return TRUE; } /* Resume transfer using input_resume_all on socket. */ as_bool as_upload_resume (ASUpload *up) { if (!up->c) return FALSE; input_resume_all (up->c->fd); return TRUE; } /*****************************************************************************/ /* Returns current upload state */ ASUploadState as_upload_state (ASUpload *up) { return up->state; } /* Return upload state as human readable static string. */ const char *as_upload_state_str (ASUpload *up) { switch (up->state) { case UPLOAD_INVALID: return "Invalid"; case UPLOAD_NEW: return "New"; case UPLOAD_ACTIVE: return "Active"; case UPLOAD_FAILED: return "Failed"; case UPLOAD_QUEUED: return "Queued"; case UPLOAD_COMPLETE: return "Completed"; case UPLOAD_CANCELLED: return "Cancelled"; } return "UNKNOWN"; } /*****************************************************************************/ /* MOVEME */ static as_bool set_header_encoded (ASHttpHeader *header, char *name, ASPacket *packet) { char *encoded; if ((encoded = as_base64_encode (packet->data, packet->used))) { as_http_header_set_field (header, name, encoded); free (encoded); return TRUE; } return FALSE; } static void set_header_b6mi (ASHttpHeader *request) { ASPacket *p; in_addr_t sip; in_port_t sport; p = as_packet_create (); /* our supernode's IP and port */ sip = as_sessman_get_supernode (AS->sessman, &sport); as_packet_put_ip (p, sip); as_packet_put_le16 (p, sport); /* our IP and port */ as_packet_put_ip (p, AS->netinfo->outside_ip); as_packet_put_le16 (p, AS->netinfo->port); as_encrypt_b6mi (p->data, p->used); set_header_encoded (request, "X-B6MI", p); as_packet_free (p); } static void set_common_headers (ASUpload *up, ASHttpHeader *reply) { char buf[32]; as_http_header_set_field (reply, "Server", AS_UPLOAD_AGENT); set_header_b6mi (reply); snprintf (buf, sizeof (buf), "%08X", ntohl (net_local_ip (up->c->fd, NULL))); as_http_header_set_field (reply, "X-MyLIP", buf); if (AS->netinfo->nick) as_http_header_set_field (reply, "X-My-Nick", AS->netinfo->nick); #ifdef AS_UPLOAD_KEEP_ALIVE as_http_header_set_field (reply, "Connection", "Keep-Alive"); #else as_http_header_set_field (reply, "Connection", "Close"); #endif } /*****************************************************************************/ static ASPacket *compile_http_reply (ASUpload *up, ASHttpHeader *reply) { String *str; ASPacket *packet; if (!(str = as_http_header_compile (reply))) return NULL; if (!(packet = as_packet_create ())) { string_free (str); return NULL; } if (!as_packet_put_ustr (packet, str->str, str->len)) { as_packet_free (packet); string_free (str); return NULL; } string_free (str); #ifdef LOG_HTTP_REPLIES as_packet_dump (packet); #endif if (upload_is_binary (up)) { if (!as_encrypt_transfer_reply (packet, &up->enc_key)) { as_packet_free (packet); return NULL; } } return packet; } static as_bool send_reply_success (ASUpload *up) { ASHttpHeader *reply; char buf[64]; ASPacket *reply_packet; reply = as_http_header_reply (HTHD_VER_11, (up->start == 0 && up->stop == up->share->size) ? 200 : 206); #if 1 snprintf (buf, sizeof (buf), "bytes=%u-%u/%u", up->start, up->stop-1, up->share->size); /* Ares */ #else snprintf (buf, sizeof (buf), "bytes %u-%u/%u", up->start, up->stop-1, up->share->size); /* HTTP */ #endif as_http_header_set_field (reply, "Content-Range", buf); snprintf (buf, sizeof (buf), "%u", up->stop - up->start); as_http_header_set_field (reply, "Content-Length", buf); set_common_headers (up, reply); reply_packet = compile_http_reply (up, reply); assert (reply_packet); /* Immediately send reply since there might be a race condition between * the tcp write queue and our file send input. I think it is not defined * which input event gets triggered first if there are multiple. */ if (tcp_send (up->c, reply_packet->data, reply_packet->used) != (int) reply_packet->used) { AS_ERR_1 ("Short send in reply for upload '%s'", up->share->path); as_packet_free (reply_packet); as_http_header_free (reply); return FALSE; } as_packet_free (reply_packet); as_http_header_free (reply); return TRUE; } static as_bool send_reply_queued (ASUpload *up, int queue_pos, int queue_length) { ASHttpHeader *reply; ASPacket *reply_packet; char buf[128]; assert (queue_pos != 0); reply = as_http_header_reply (HTHD_VER_11, 503); set_common_headers (up, reply); /* Omit queue header if position is < 0 */ if (queue_pos > 0) { sprintf (buf, "position=%u,length=%u,limit=%u,pollMin=%u,pollMax=%u", queue_pos, queue_length, 1 /* max active downloads (per user?) */, AS_UPLOAD_QUEUE_MIN, /* min/max recheck intervals in seconds */ AS_UPLOAD_QUEUE_MAX); as_http_header_set_field (reply, "X-Queued", buf); } reply_packet = compile_http_reply (up, reply); assert (reply_packet); /* Immediately send reply and close connection. */ tcp_send (up->c, reply_packet->data, reply_packet->used); tcp_close_null (&up->c); as_packet_free (reply_packet); as_http_header_free (reply); if (!upload_set_state (up, UPLOAD_QUEUED, TRUE)) return FALSE; /* Callback freed us */ return TRUE; } static as_bool send_reply_metadata (ASUpload *up) { ASHttpHeader *reply; ASPacket *reply_packet; char buf[64]; char *val; assert (up->share); reply = as_http_header_reply (HTHD_VER_11, 200); set_common_headers (up, reply); /* Add meta data, specifically file size. */ if ((val = as_url_encode (as_meta_get_tag (up->share->meta, "title")))) { as_http_header_set_field (reply, "X-Title", val); free (val); } if ((val = as_url_encode (as_meta_get_tag (up->share->meta, "artist")))) { as_http_header_set_field (reply, "X-Artist", val); free (val); } if ((val = as_url_encode (as_meta_get_tag (up->share->meta, "album")))) { as_http_header_set_field (reply, "X-Album", val); free (val); } snprintf (buf, sizeof (buf), "%u", up->share->size); as_http_header_set_field (reply, "X-Size", buf); reply_packet = compile_http_reply (up, reply); assert (reply_packet); /* Immediately send reply and close connection. */ tcp_send (up->c, reply_packet->data, reply_packet->used); tcp_close_null (&up->c); as_packet_free (reply_packet); as_http_header_free (reply); if (!upload_set_state (up, UPLOAD_COMPLETE, TRUE)) return FALSE; /* Callback freed us */ return TRUE; } static as_bool send_reply_error (ASUpload *up, as_bool our_fault) { ASHttpHeader *reply; ASPacket *reply_packet; reply = as_http_header_reply (HTHD_VER_11, our_fault ? 500 : 400); set_common_headers (up, reply); reply_packet = compile_http_reply (up, reply); assert (reply_packet); /* Immediately send reply and close connection. */ tcp_send (up->c, reply_packet->data, reply_packet->used); tcp_close_null (&up->c); as_packet_free (reply_packet); as_http_header_free (reply); if (!upload_set_state (up, UPLOAD_FAILED, TRUE)) return FALSE; /* Callback freed us */ return TRUE; } static as_bool send_reply_not_found (ASUpload *up) { ASHttpHeader *reply; ASPacket *reply_packet; reply = as_http_header_reply (HTHD_VER_11, 404); set_common_headers (up, reply); reply_packet = compile_http_reply (up, reply); assert (reply_packet); /* Immediately send reply and close connection. */ tcp_send (up->c, reply_packet->data, reply_packet->used); tcp_close_null (&up->c); as_packet_free (reply_packet); as_http_header_free (reply); if (!upload_set_state (up, UPLOAD_FAILED, TRUE)) return FALSE; /* Callback freed us */ return TRUE; } /*****************************************************************************/ static as_bool send_error (ASUpload *up) { input_remove (up->input); up->input = INVALID_INPUT; tcp_close_null (&up->c); if (up->file) { fclose (up->file); up->file = NULL; } return upload_set_state (up, UPLOAD_CANCELLED, TRUE); /* may free us */ } static void send_file (int fd, input_id input, ASUpload *up) { int in, out; unsigned int left, wanted; as_uint8 buf[BLOCKSIZE]; if (net_sock_error (fd)) { AS_DBG_3 ("net_sock_error %d after %u bytes for upload to %s", errno, up->sent, net_ip_str (up->host)); send_error (up); /* may free us */ return; } /* Give callback a chance to slow us down. */ if (up->throttle_cb) { wanted = up->throttle_cb (up, BLOCKSIZE); assert (wanted <= BLOCKSIZE); if (wanted == 0) return; /* Nothing to do. */ } else wanted = BLOCKSIZE; left = (up->stop - up->start) - up->sent; if (wanted > left) wanted = left; in = fread (buf, 1, wanted, up->file); if (in < (int)wanted) { AS_WARN_3 ("Read (%d of %d) failed from %s. Cancelling upload.", in, wanted, up->share->path); send_error (up); /* may free us */ return; } /* encrypt data */ if (upload_is_binary (up)) { as_encrypt_transfer_body (buf, in, &up->enc_key); } /* send data */ out = tcp_send (up->c, buf, in); if (out < 0) { AS_DBG_2 ("Failed to write %d bytes to %s. Cancelling upload.", in, net_ip_str (up->host)); send_error (up); /* may free us */ return; } if (out < in) { AS_DBG_3 ("Wrote %d of %d bytes to %s, rewinding", out, in, net_ip_str (up->host)); if (fseek (up->file, -((off_t)(in - out)), SEEK_CUR) < 0) { AS_ERR ("Rewind failed. Cancelling upload."); send_error (up); /* may free us */ return; } } up->sent += out; /* Raise data callback if there is one */ if (up->data_cb) { if (!up->data_cb (up, out)) return; /* We were freed. */ } /* Check if upload is complete */ assert (up->sent <= up->stop - up->start); if (up->sent == up->stop - up->start) { AS_DBG_3 ("Finished uploading %d bytes of '%s' to %s", up->sent, up->share->path, net_ip_str (up->host)); input_remove (up->input); up->input = INVALID_INPUT; fclose (up->file); up->file = NULL; #ifdef AS_UPLOAD_KEEP_ALIVE /* Treat tcp connection as new connection to server. */ AS_HEAVY_DBG_1 ("Handing off keep-alive connection to %s to http server.", net_ip_str (up->host)); as_http_server_pushed (AS->server, up->c); up->c = NULL; #else tcp_close_null (&up->c); #endif upload_set_state (up, UPLOAD_COMPLETE, TRUE); /* may free us */ return; } } /*****************************************************************************/