/* $Id: fetch.c,v 1.13 2004/02/13 16:09:29 ossi Exp $ * * * puf 0.9 Copyright (C) 2000-2004 by Oswald Buddenhagen * based on puf 0.1.x (C) 1999,2000 by Anders Gavare * * You may modify and distribute this code under the terms of the GPL. * There is NO WARRANTY of any kind. See COPYING for details. * * fetch.c - url fetch loop * */ #include "puf.h" off_t max_bytes, fetched_bytes, total_bytes; int max_dnss_active = DEFAULT_MAX_DNS_FORKS; int max_urls_active = DEFAULT_MAX_ACTIVE; int timeout_dns = DEFAULT_TIMEOUT_DNS; int max_time; int max_urls; int num_urls; int num_urls_done; int num_urls_fail; int num_errors; int show_stat = 1; int waiting_proxies; int all_proxy_wait = 1; /* unused (immutable) */ struct timeval cur_tv, throttle; struct sockaddr_in bind_addr; circular_queue(queue_urls_connect, wurl_t); /* ready to connect */ linear_na_queue(queue_urls_request, aurl_t); /* started connect, waiting for write */ linear_na_queue(queue_urls_reply, aurl_t); /* request sent, waiting for reply */ /* try to connect to a host. returns -1 on error, otherwise a file descriptor number is returned (socket number). */ static int tcp_connect(struct in_addr addr, u_short port, struct in_addr baddr) { int s; struct sockaddr_in server_in; if ((s = socket(PF_INET, SOCK_STREAM, 0)) < 0 && (!free_fd() || (s = socket(PF_INET, SOCK_STREAM, 0)) < 0)) return -1; if (baddr.s_addr) { bind_addr.sin_addr = baddr; bind(s, (struct sockaddr *)&bind_addr, sizeof(struct sockaddr)); } fcntl(s, F_SETFL, fcntl(s, F_GETFL) | O_NONBLOCK); server_in.sin_family = AF_INET; server_in.sin_addr = addr; server_in.sin_port = htons(port); connect(s, (struct sockaddr *)&server_in, sizeof(server_in)); return s; } static void conn_err (aurl_t *au, int dr, int errt, int errw, char *etww, char *etnw) { haddr_t *ip; hinfo_t *hi; if (!dr && au->proxy) { hi = au->proxy->host->info; ip = hi->ips + au->pipidx; } else { hi = au->url->host->info; ip = hi->ips + au->ipidx; } if (au->url->parm->opt->fail_no_wait) { if (errm(au->url, etnw) == RT_RETRY) queue_url(au->url); } else { prx(WRN, etww, hi->name); queue_url(au->url); } if (ip->last_errt != 3 && cur_tv.tv_sec >= ip->retry_time) { if (++ip->attempt >= (unsigned)au->url->parm->opt->max_attempts) { ip->last_errt = 3; prx(WRN, "giving up address '%s' for host '%s'.\n", inet_ntoa(ip->addr), hi->name); } else { if (errt != ip->last_errt) { dbg(CON, (" setting new error type %d\n", errt)); ip->last_errt = errt; ip->err_wait = errw; } dbg(CON, (" retrying in %d seconds\n", ip->err_wait)); ip->retry_time = cur_tv.tv_sec + ip->err_wait; ip->err_wait *= 2; } } } static void fmt_time(char *d, int ti) { if (ti == -1) strcpy(d, "??:??"); else if (ti < 6000) sprintf(d, "%02d:%02d", ti / 60, ti % 60); else if (ti < 360000) sprintf(d, "%02dh%02d", ti / 3600, ti / 60 % 60); else if (ti < 8640000) sprintf(d, "%dd%02d", ti / 86400, ti / 3600 % 24); else strcpy(d, "> 99d"); } int touch(aurl_t *au) { struct utimbuf ut; if (!au->file_time || au->url->parm->opt->no_touch) return 0; ut.actime = ut.modtime = au->file_time; return utime(au->disposition, &ut); } static void cleanup(void) { /* kill off still running dns helpers */ lnq_iterate_rm(queue_dns_idle, dnsproc_t, pr, { dbg(QUE, ("iterate_rm idle dns helper\n")); lnq_remove(queue_dns_idle, pr); reap_dnsproc(pr); continue; }, c_fid); } void byebye(char *msg) { lnq_iterate(queue_urls_reply, aurl_t, au, { touch(au); }); cleanup(); die(1, msg); } static void sigint() { byebye("interrupted!"); } static void sigterm() { byebye("terminated!"); } static void sigalrm() { byebye("time quota exceeded!"); } /* Fetch all urls in parallel: */ void fetch_all() { fd_set rfds, wfds; struct timeval last_tv, start_tv, next_tv, next_conn_tv, next_dpy_tv, to; time_t next_fork_time; long timediff, tottime; off_t last_fetched_bytes; int top_speed, avg_speed; int num_dns_busy, num_dns_idle; int num_urls_active; wobj_t *wo, *nwo; aurl_t *au; url_t *u; int i, mxfd, nfds; int *spds, spdi, spdn; /* Initialize some data: */ gettimeofday(&last_tv, NULL); start_tv = last_tv; timerclear(&next_conn_tv); timerclear(&next_dpy_tv); next_fork_time = 0; last_fetched_bytes = 0; top_speed = avg_speed = 0; num_dns_busy = num_dns_idle = 0; num_urls_active = 0; /* Status info: */ spdi = spdn = 0; if (show_stat) { if (!(spds = mmalloc(sizeof(int) * AVERAGING_TIMEFRAME))) show_stat = 0; else { memset(spds, 0, sizeof(int) * AVERAGING_TIMEFRAME); printf("\n URLs Connections Bytes Time Kbyte/s\n" " done+ fail/ total errs cur/max done/total pass left cur/avg\n"); } } else spds = 0; signal(SIGINT, sigint); signal(SIGTERM, sigterm); signal(SIGALRM, sigalrm); signal(SIGPIPE, SIG_IGN); alarm(max_time); /* Megaloop: */ for (;;) { gettimeofday(&cur_tv, NULL); checken("fetch_all (top)"); FD_ZERO(&rfds); FD_ZERO(&wfds); mxfd = -1; nfds = 0; dbg(QUE, ("---\n")); /* urls waiting for initiation */ if (timercmp(&cur_tv, &next_conn_tv, <)) nfds++; /* so we don't cancel out */ else cq_consume(queue_urls_connect, wurl_t, wu, { int rt; dbg(QUE, ("consume http://%s/%s\n", wu->url->host->name, wu->url->local_part)); nfds++; if (num_urls_active >= max_urls_active) break; if (max_urls && num_urls_done > max_urls) byebye("URL count quota exceeded!"); /* XXX this is "somewhat" harsh */ u = wu->url; rt = activate_url(u, &au); if (rt == RT_AGAIN) { /* transient server problem */ queue_urls_connect = wu; continue; } else if (rt == RT_RETRY) /* transient error */ break; else if (rt == RT_SKIP) /* already exists */ num_urls--; else if (rt == RT_OK) { struct in_addr addr; struct in_addr baddr; u_short port; if (au->proxy) { addr = au->proxy->host->info->ips[au->pipidx].addr; port = au->proxy->port; } else { addr = u->host->info->ips[au->ipidx].addr; port = u->port; } if (au->url->parm->opt->bind_addrs.nents) { baddr = *((struct in_addr **)au->url->parm->opt->bind_addrs.ents) [RND(au->url->parm->opt->bind_addrs.nents)]; dbg(CON, ("connecting to '%s' %s:%i from %d.%d.%d.%d ... ", au->proxy ? au->proxy->host->name : u->host->name, inet_ntoa(addr), port, baddr.s_addr & 255, (baddr.s_addr >> 8) & 255, (baddr.s_addr >> 16) & 255, baddr.s_addr >> 24)); } else { baddr.s_addr = 0; dbg(CON, ("connecting to '%s' %s:%i ... ", au->proxy ? au->proxy->host->name : u->host->name, inet_ntoa(addr), port)); } if ((au->socket = tcp_connect(addr, port, baddr)) < 0) { dbge(CON, ("failed!\n")); free(au); if (!num_urls_active) die(1, "tcp_connect() keeps failing."); prx(ERR, "tcp_connect() failed!\n"); break; } else { dbge(CON, ("ok\n")); if (au->proxy) au->proxy->score++; au->timeout = cur_tv.tv_sec + au->url->parm->opt->timeout_connect; lnq_append(queue_urls_request, au); num_urls_active++; if (timerisset(&throttle)) { timeradd(&cur_tv, &throttle, &next_conn_tv); cq_rm1st(queue_urls_connect); free(wu); break; } } } else if (rt != RT_GIVEUP) /* permanent error */ dbg(CON, ("unknown return code %d from activate_url\n", rt)); cq_rm1st(queue_urls_connect); free(wu); }); checken("fetch_all (after consume_urls)"); /* dns lookups waiting for initiation */ lq_consume (queue_dns_lookup, whost_t, wh, { dnsproc_t *pr; dbg(QUE, ("consume host %s\n", wh->host->name)); nfds++; if (num_dns_busy >= max_dnss_active) break; whrt: if ((pr = queue_dns_idle)) { queue_dns_idle = pr->next; num_dns_idle--; } else { if (cur_tv.tv_sec < next_fork_time) break; if (!(pr = fork_dnsproc())) { next_fork_time = cur_tv.tv_sec + 1; break; } } pr->whost = wh; if (!start_lookup(pr)) { reap_dnsproc(pr); goto whrt; } lq_rm1st(queue_dns_lookup); lnq_append(queue_dns_busy, pr); num_dns_busy++; }); checken("fetch_all (after consume_hosts)"); /* idle dns helpers */ lnq_iterate_rm(queue_dns_idle, dnsproc_t, pr, { dbg(QUE, ("iterate_rm idle dns helper\n")); if (pr->timeout < cur_tv.tv_sec) { dbg(QUE, (" timeout\n")); lnq_remove(queue_dns_idle, pr); num_dns_idle--; reap_dnsproc(pr); continue; } }, c_id); checken("fetch_all (after iterating over idle dns helpers)"); /* urls waiting for reply */ lnq_iterate(queue_urls_reply, aurl_t, au, { nfds++; FD_SET(au->socket, &rfds); if (au->socket > mxfd) mxfd = au->socket; }); /* urls waiting for connection establishement */ lnq_iterate(queue_urls_request, aurl_t, au, { nfds++; FD_SET(au->socket, &wfds); if (au->socket > mxfd) mxfd = au->socket; }); /* dns lookpus waiting for completion */ lnq_iterate(queue_dns_busy, dnsproc_t, pr, { nfds++; FD_SET(pr->fd, &rfds); if (pr->fd > mxfd) mxfd = pr->fd; }); if (show_stat) { int esttimeleft, cur_speed, mid_speed; char estts[10], totts[10]; timediff = (cur_tv.tv_sec - last_tv.tv_sec) * 100 + (cur_tv.tv_usec - last_tv.tv_usec) / 10000; if (timediff >= 100 || !nfds) { cur_speed = timediff ? (int)((fetched_bytes - last_fetched_bytes) * 100 / timediff) : 0; spds[spdi] = cur_speed; if (++spdi >= AVERAGING_TIMEFRAME) spdi = 0; if (spdn < AVERAGING_TIMEFRAME) spdn++; for (mid_speed = 0, i = 0; i < spdn; i++) mid_speed += spds[i]; mid_speed /= spdn; if (mid_speed) { esttimeleft = (total_bytes - fetched_bytes) / mid_speed; } else esttimeleft = -1; if (cur_speed > top_speed) top_speed = cur_speed; tottime = ((cur_tv.tv_sec - start_tv.tv_sec) * 100 + (cur_tv.tv_usec - start_tv.tv_usec) / 10000) / 100; avg_speed = tottime ? (int)(fetched_bytes / tottime) : 0; fmt_time(totts, tottime); fmt_time(estts, esttimeleft); printf("\r%6d+%5d/%6d %6d %3d/%-3d %10"SSOFFT"/%-10"SSOFFT " %5s %5s %5d/%-4d", num_urls_done, num_urls_fail, num_urls, num_errors, num_urls_active, max_urls_active, fetched_bytes, total_bytes, totts, estts, cur_speed / 1024, avg_speed / 1024); fflush(stdout); last_fetched_bytes = fetched_bytes; last_tv = cur_tv; next_dpy_tv = cur_tv; next_dpy_tv.tv_sec++; } } checken("fetch_all (after stats)"); if (!nfds) { if (show_stat) { printf("\n\nTop speed: %9i bytes/second\n" "Average speed: %9i bytes/second\n", top_speed, avg_speed); free(spds); } cleanup(); return; } next_tv = cur_tv; next_tv.tv_sec++; /* needed for other timeouts */ if (timerisset(&next_conn_tv) && timercmp(&next_conn_tv, &next_tv, <)) next_tv = next_conn_tv; if (timerisset(&next_dpy_tv) && timercmp(&next_dpy_tv, &next_tv, <)) next_tv = next_dpy_tv; timersub( &next_tv, &cur_tv, &to); if (select(mxfd + 1, &rfds, &wfds, 0, &to) < 0) die(1, "select() failed!"); /* urls waiting for reply */ lnq_iterate_rm(queue_urls_reply, aurl_t, au, { dbg(QUE, ("iterate_rm reply http://%s/%s\n", au->url->host->name, au->url->local_part)); if (FD_ISSET(au->socket, &rfds)) { dbg(QUE, (" has data\n")); switch (handle_reply(au)) { case RT_OK: au->timeout = cur_tv.tv_sec + au->url->parm->opt->timeout_data; goto c_ur; case RT_SKIP: num_urls--; goto gofrnu; case RT_DONE: num_urls_done++; break; case RT_AGAIN: queue_url(au->url); break; case RT_RETRY: goto gorequ; case RT_GIVEUP: break; case RT_TIMEOUT: conn_err (au, 1, 1, au->url->parm->opt->timeout_connect, "connect to '%s' timed out\n", "connect for $u timed out"); break; case RT_REFUSED: conn_err (au, 1, 2, 3, "connect to '%s' failed\n", "connect for $u failed"); break; } } else { if (cur_tv.tv_sec < au->timeout) goto c_ur; if (errm(au->url, "data fetch for $u timed out") == RT_RETRY) { gorequ: queue_url(au->url); } gofrnu: if (au->url->parm->opt->delete_broken) unlink(au->disposition); } /* fake -- err: correct our statistics. ;-) */ if (au->http_done_header && au->size_total) total_bytes += -au->size_total + au->size_fetched; lnq_remove(queue_urls_reply, au); num_urls_active--; close(au->socket); if (au->buffer) free(au->buffer); if (au->headers) free(au->headers); if (au->f != -1) close(au->f); free(au); dbg(QUE, (" removed\n")); continue; }, c_ur); checken("fetch_all (after iterating over replies)"); /* urls waiting for connection establishement */ lnq_iterate_rm(queue_urls_request, aurl_t, au, { dbg(QUE, ("iterate_rm request http://%s/%s\n", au->url->host->name, au->url->local_part)); if (FD_ISSET(au->socket, &wfds)) { int err; int errl = sizeof(int); dbg(QUE, (" connect event\n")); getsockopt(au->socket, SOL_SOCKET, SO_ERROR, (void *)&err, &errl); if (err) conn_err (au, 0, 2, 3, "connect to '%s' failed\n", "connect for $u failed"); else if (send_http_get(au) <= 0) conn_err (au, 0, 2, 3, "HTTP request send to '%s' failed\n", "HTTP request send for $u failed"); else { haddr_t *ip; lnq_remove(queue_urls_request, au); lnq_append(queue_urls_reply, au); au->timeout = cur_tv.tv_sec + au->url->parm->opt->timeout_data; if (au->proxy) ip = au->proxy->host->info->ips + au->pipidx; else ip = au->url->host->info->ips + au->ipidx; ip->last_errt = 0; ip->attempt = 0; continue; } } else { if (cur_tv.tv_sec < au->timeout) goto c_uq; conn_err (au, 0, 1, au->url->parm->opt->timeout_connect, "connect to '%s' timed out\n", "connect for $u timed out"); } lnq_remove(queue_urls_request, au); num_urls_active--; close(au->socket); free(au); continue; }, c_uq); checken("fetch_all (after iterating over connects)"); /* dns lookups waiting for completion */ lnq_iterate_rm(queue_dns_busy, dnsproc_t, pr, { dbg(QUE, ("iterate_rm host %s\n", pr->whost->host->name)); if (FD_ISSET(pr->fd, &rfds)) { whost_t *wh = pr->whost; dbg(QUE, (" has data\n")); if (!finish_lookup(pr)) { lnq_remove(queue_dns_busy, pr); num_dns_busy--; reap_dnsproc(pr); lq_prepend(queue_dns_lookup, wh); continue; } checken("fetch_all (after finish_lookup)"); if (wh->host->info) { nwo = wh->objq; while ((wo = nwo) != 0) { nwo = wo->next; if (wo->url) { if (wo->url->referer && (wo->url->host->info != wo->url->referer->host->info) && (wo->url->is_requisite ? wo->url->parm->opt->follow_src : wo->url->parm->opt->follow_href) <= HOST_RECURSIVE) { prx(NFO, "not adding 'http://%s/%s' (different host)\n", wo->url->host->name, wo->url->local_part); free_url(wo->url); checken("fetch_all (after free_url (1))"); } else { int hash; if (find_url(wo->url->local_part, strlen(wo->url->local_part), wo->url->host->info, wo->url->port, &hash)) { free_url(wo->url); checken("fetch_all (after free_url (2))"); } else { wo->url->url_hash = hash; add_url(wo->url); checken("fetch_all (after add_url)"); } } } if (wo->proxy) { wo->proxy->ready = 1; waiting_proxies--; } free(wo); } } else { nwo = wh->objq; while ((wo = nwo) != 0) { nwo = wo->next; if (wo->url) { if ((wo->url->is_requisite ? wo->url->parm->opt->follow_src : wo->url->parm->opt->follow_href) <= HOST_RECURSIVE) { num_urls++; num_urls_fail++; } free(wo->url); } if (wo->proxy) { wo->proxy->ready = 1; waiting_proxies--; } free(wo); } } free(wh); lnq_remove(queue_dns_busy, pr); num_dns_busy--; lnq_append(queue_dns_idle, pr); num_dns_idle++; pr->timeout = cur_tv.tv_sec + 60; continue; } }, c_dr); checken("fetch_all (after iterating over dns lookups)"); } }