/* $Id: fetch.c,v 1.13 2004/02/13 16:09:29 ossi Exp $ *
*
* puf 0.9 Copyright (C) 2000-2004 by Oswald Buddenhagen <puf@ossi.cjb.net>
* based on puf 0.1.x (C) 1999,2000 by Anders Gavare <gavare@hotmail.com>
*
* You may modify and distribute this code under the terms of the GPL.
* There is NO WARRANTY of any kind. See COPYING for details.
*
* fetch.c - url fetch loop
*
*/
#include "puf.h"
off_t max_bytes, fetched_bytes, total_bytes;
int max_dnss_active = DEFAULT_MAX_DNS_FORKS;
int max_urls_active = DEFAULT_MAX_ACTIVE;
int timeout_dns = DEFAULT_TIMEOUT_DNS;
int max_time;
int max_urls;
int num_urls;
int num_urls_done;
int num_urls_fail;
int num_errors;
int show_stat = 1;
int waiting_proxies;
int all_proxy_wait = 1; /* unused (immutable) */
struct timeval cur_tv, throttle;
struct sockaddr_in bind_addr;
circular_queue(queue_urls_connect, wurl_t); /* ready to connect */
linear_na_queue(queue_urls_request, aurl_t); /* started connect, waiting for write */
linear_na_queue(queue_urls_reply, aurl_t); /* request sent, waiting for reply */
/* try to connect to a host. returns -1 on error, otherwise a file
descriptor number is returned (socket number). */
static int
tcp_connect(struct in_addr addr, u_short port, struct in_addr baddr)
{
int s;
struct sockaddr_in server_in;
if ((s = socket(PF_INET, SOCK_STREAM, 0)) < 0 &&
(!free_fd() || (s = socket(PF_INET, SOCK_STREAM, 0)) < 0))
return -1;
if (baddr.s_addr) {
bind_addr.sin_addr = baddr;
bind(s, (struct sockaddr *)&bind_addr, sizeof(struct sockaddr));
}
fcntl(s, F_SETFL, fcntl(s, F_GETFL) | O_NONBLOCK);
server_in.sin_family = AF_INET;
server_in.sin_addr = addr;
server_in.sin_port = htons(port);
connect(s, (struct sockaddr *)&server_in, sizeof(server_in));
return s;
}
static void
conn_err (aurl_t *au, int dr, int errt, int errw, char *etww, char *etnw)
{
haddr_t *ip;
hinfo_t *hi;
if (!dr && au->proxy) {
hi = au->proxy->host->info;
ip = hi->ips + au->pipidx;
} else {
hi = au->url->host->info;
ip = hi->ips + au->ipidx;
}
if (au->url->parm->opt->fail_no_wait) {
if (errm(au->url, etnw) == RT_RETRY)
queue_url(au->url);
} else {
prx(WRN, etww, hi->name);
queue_url(au->url);
}
if (ip->last_errt != 3 && cur_tv.tv_sec >= ip->retry_time) {
if (++ip->attempt >= (unsigned)au->url->parm->opt->max_attempts) {
ip->last_errt = 3;
prx(WRN, "giving up address '%s' for host '%s'.\n",
inet_ntoa(ip->addr), hi->name);
} else {
if (errt != ip->last_errt) {
dbg(CON, (" setting new error type %d\n", errt));
ip->last_errt = errt;
ip->err_wait = errw;
}
dbg(CON, (" retrying in %d seconds\n", ip->err_wait));
ip->retry_time = cur_tv.tv_sec + ip->err_wait;
ip->err_wait *= 2;
}
}
}
static void
fmt_time(char *d, int ti)
{
if (ti == -1)
strcpy(d, "??:??");
else if (ti < 6000)
sprintf(d, "%02d:%02d", ti / 60, ti % 60);
else if (ti < 360000)
sprintf(d, "%02dh%02d", ti / 3600, ti / 60 % 60);
else if (ti < 8640000)
sprintf(d, "%dd%02d", ti / 86400, ti / 3600 % 24);
else
strcpy(d, "> 99d");
}
int
touch(aurl_t *au)
{
struct utimbuf ut;
if (!au->file_time || au->url->parm->opt->no_touch)
return 0;
ut.actime = ut.modtime = au->file_time;
return utime(au->disposition, &ut);
}
static void
cleanup(void)
{
/* kill off still running dns helpers */
lnq_iterate_rm(queue_dns_idle, dnsproc_t, pr, {
dbg(QUE, ("iterate_rm idle dns helper\n"));
lnq_remove(queue_dns_idle, pr);
reap_dnsproc(pr);
continue;
}, c_fid);
}
void
byebye(char *msg)
{
lnq_iterate(queue_urls_reply, aurl_t, au, {
touch(au);
});
cleanup();
die(1, msg);
}
static void sigint() { byebye("interrupted!"); }
static void sigterm() { byebye("terminated!"); }
static void sigalrm() { byebye("time quota exceeded!"); }
/* Fetch all urls in parallel: */
void
fetch_all()
{
fd_set rfds, wfds;
struct timeval last_tv, start_tv, next_tv, next_conn_tv, next_dpy_tv, to;
time_t next_fork_time;
long timediff, tottime;
off_t last_fetched_bytes;
int top_speed, avg_speed;
int num_dns_busy, num_dns_idle;
int num_urls_active;
wobj_t *wo, *nwo;
aurl_t *au;
url_t *u;
int i, mxfd, nfds;
int *spds, spdi, spdn;
/* Initialize some data: */
gettimeofday(&last_tv, NULL);
start_tv = last_tv;
timerclear(&next_conn_tv);
timerclear(&next_dpy_tv);
next_fork_time = 0;
last_fetched_bytes = 0;
top_speed = avg_speed = 0;
num_dns_busy = num_dns_idle = 0;
num_urls_active = 0;
/* Status info: */
spdi = spdn = 0;
if (show_stat) {
if (!(spds = mmalloc(sizeof(int) * AVERAGING_TIMEFRAME)))
show_stat = 0;
else {
memset(spds, 0, sizeof(int) * AVERAGING_TIMEFRAME);
printf("\n URLs Connections Bytes Time Kbyte/s\n"
" done+ fail/ total errs cur/max done/total pass left cur/avg\n");
}
} else
spds = 0;
signal(SIGINT, sigint);
signal(SIGTERM, sigterm);
signal(SIGALRM, sigalrm);
signal(SIGPIPE, SIG_IGN);
alarm(max_time);
/* Megaloop: */
for (;;) {
gettimeofday(&cur_tv, NULL);
checken("fetch_all (top)");
FD_ZERO(&rfds);
FD_ZERO(&wfds);
mxfd = -1;
nfds = 0;
dbg(QUE, ("---\n"));
/* urls waiting for initiation */
if (timercmp(&cur_tv, &next_conn_tv, <))
nfds++; /* so we don't cancel out */
else
cq_consume(queue_urls_connect, wurl_t, wu, {
int rt;
dbg(QUE, ("consume http://%s/%s\n", wu->url->host->name, wu->url->local_part));
nfds++;
if (num_urls_active >= max_urls_active)
break;
if (max_urls && num_urls_done > max_urls)
byebye("URL count quota exceeded!"); /* XXX this is "somewhat" harsh */
u = wu->url;
rt = activate_url(u, &au);
if (rt == RT_AGAIN) { /* transient server problem */
queue_urls_connect = wu;
continue;
} else if (rt == RT_RETRY) /* transient error */
break;
else if (rt == RT_SKIP) /* already exists */
num_urls--;
else if (rt == RT_OK) {
struct in_addr addr;
struct in_addr baddr;
u_short port;
if (au->proxy) {
addr = au->proxy->host->info->ips[au->pipidx].addr;
port = au->proxy->port;
} else {
addr = u->host->info->ips[au->ipidx].addr;
port = u->port;
}
if (au->url->parm->opt->bind_addrs.nents) {
baddr =
*((struct in_addr **)au->url->parm->opt->bind_addrs.ents)
[RND(au->url->parm->opt->bind_addrs.nents)];
dbg(CON, ("connecting to '%s' %s:%i from %d.%d.%d.%d ... ",
au->proxy ? au->proxy->host->name : u->host->name,
inet_ntoa(addr), port,
baddr.s_addr & 255, (baddr.s_addr >> 8) & 255,
(baddr.s_addr >> 16) & 255, baddr.s_addr >> 24));
} else {
baddr.s_addr = 0;
dbg(CON, ("connecting to '%s' %s:%i ... ",
au->proxy ? au->proxy->host->name : u->host->name,
inet_ntoa(addr), port));
}
if ((au->socket = tcp_connect(addr, port, baddr)) < 0) {
dbge(CON, ("failed!\n"));
free(au);
if (!num_urls_active)
die(1, "tcp_connect() keeps failing.");
prx(ERR, "tcp_connect() failed!\n");
break;
} else {
dbge(CON, ("ok\n"));
if (au->proxy)
au->proxy->score++;
au->timeout = cur_tv.tv_sec +
au->url->parm->opt->timeout_connect;
lnq_append(queue_urls_request, au);
num_urls_active++;
if (timerisset(&throttle)) {
timeradd(&cur_tv, &throttle, &next_conn_tv);
cq_rm1st(queue_urls_connect);
free(wu);
break;
}
}
} else if (rt != RT_GIVEUP) /* permanent error */
dbg(CON, ("unknown return code %d from activate_url\n", rt));
cq_rm1st(queue_urls_connect);
free(wu);
});
checken("fetch_all (after consume_urls)");
/* dns lookups waiting for initiation */
lq_consume (queue_dns_lookup, whost_t, wh, {
dnsproc_t *pr;
dbg(QUE, ("consume host %s\n", wh->host->name));
nfds++;
if (num_dns_busy >= max_dnss_active)
break;
whrt:
if ((pr = queue_dns_idle)) {
queue_dns_idle = pr->next;
num_dns_idle--;
} else {
if (cur_tv.tv_sec < next_fork_time)
break;
if (!(pr = fork_dnsproc())) {
next_fork_time = cur_tv.tv_sec + 1;
break;
}
}
pr->whost = wh;
if (!start_lookup(pr)) {
reap_dnsproc(pr);
goto whrt;
}
lq_rm1st(queue_dns_lookup);
lnq_append(queue_dns_busy, pr);
num_dns_busy++;
});
checken("fetch_all (after consume_hosts)");
/* idle dns helpers */
lnq_iterate_rm(queue_dns_idle, dnsproc_t, pr, {
dbg(QUE, ("iterate_rm idle dns helper\n"));
if (pr->timeout < cur_tv.tv_sec) {
dbg(QUE, (" timeout\n"));
lnq_remove(queue_dns_idle, pr);
num_dns_idle--;
reap_dnsproc(pr);
continue;
}
}, c_id);
checken("fetch_all (after iterating over idle dns helpers)");
/* urls waiting for reply */
lnq_iterate(queue_urls_reply, aurl_t, au, {
nfds++;
FD_SET(au->socket, &rfds);
if (au->socket > mxfd)
mxfd = au->socket;
});
/* urls waiting for connection establishement */
lnq_iterate(queue_urls_request, aurl_t, au, {
nfds++;
FD_SET(au->socket, &wfds);
if (au->socket > mxfd)
mxfd = au->socket;
});
/* dns lookpus waiting for completion */
lnq_iterate(queue_dns_busy, dnsproc_t, pr, {
nfds++;
FD_SET(pr->fd, &rfds);
if (pr->fd > mxfd)
mxfd = pr->fd;
});
if (show_stat) {
int esttimeleft, cur_speed, mid_speed;
char estts[10], totts[10];
timediff = (cur_tv.tv_sec - last_tv.tv_sec) * 100
+ (cur_tv.tv_usec - last_tv.tv_usec) / 10000;
if (timediff >= 100 || !nfds) {
cur_speed = timediff ?
(int)((fetched_bytes - last_fetched_bytes) * 100 /
timediff) : 0;
spds[spdi] = cur_speed;
if (++spdi >= AVERAGING_TIMEFRAME)
spdi = 0;
if (spdn < AVERAGING_TIMEFRAME)
spdn++;
for (mid_speed = 0, i = 0; i < spdn; i++)
mid_speed += spds[i];
mid_speed /= spdn;
if (mid_speed) {
esttimeleft = (total_bytes - fetched_bytes) / mid_speed;
} else
esttimeleft = -1;
if (cur_speed > top_speed)
top_speed = cur_speed;
tottime = ((cur_tv.tv_sec - start_tv.tv_sec) * 100
+ (cur_tv.tv_usec - start_tv.tv_usec) / 10000) / 100;
avg_speed = tottime ? (int)(fetched_bytes / tottime) : 0;
fmt_time(totts, tottime);
fmt_time(estts, esttimeleft);
printf("\r%6d+%5d/%6d %6d %3d/%-3d %10"SSOFFT"/%-10"SSOFFT
" %5s %5s %5d/%-4d",
num_urls_done, num_urls_fail, num_urls, num_errors,
num_urls_active, max_urls_active, fetched_bytes,
total_bytes, totts, estts,
cur_speed / 1024, avg_speed / 1024);
fflush(stdout);
last_fetched_bytes = fetched_bytes;
last_tv = cur_tv;
next_dpy_tv = cur_tv;
next_dpy_tv.tv_sec++;
}
}
checken("fetch_all (after stats)");
if (!nfds) {
if (show_stat) {
printf("\n\nTop speed: %9i bytes/second\n"
"Average speed: %9i bytes/second\n",
top_speed, avg_speed);
free(spds);
}
cleanup();
return;
}
next_tv = cur_tv;
next_tv.tv_sec++; /* needed for other timeouts */
if (timerisset(&next_conn_tv) && timercmp(&next_conn_tv, &next_tv, <))
next_tv = next_conn_tv;
if (timerisset(&next_dpy_tv) && timercmp(&next_dpy_tv, &next_tv, <))
next_tv = next_dpy_tv;
timersub( &next_tv, &cur_tv, &to);
if (select(mxfd + 1, &rfds, &wfds, 0, &to) < 0)
die(1, "select() failed!");
/* urls waiting for reply */
lnq_iterate_rm(queue_urls_reply, aurl_t, au, {
dbg(QUE, ("iterate_rm reply http://%s/%s\n",
au->url->host->name, au->url->local_part));
if (FD_ISSET(au->socket, &rfds)) {
dbg(QUE, (" has data\n"));
switch (handle_reply(au)) {
case RT_OK:
au->timeout = cur_tv.tv_sec +
au->url->parm->opt->timeout_data;
goto c_ur;
case RT_SKIP:
num_urls--;
goto gofrnu;
case RT_DONE:
num_urls_done++;
break;
case RT_AGAIN:
queue_url(au->url);
break;
case RT_RETRY:
goto gorequ;
case RT_GIVEUP:
break;
case RT_TIMEOUT:
conn_err (au, 1, 1, au->url->parm->opt->timeout_connect,
"connect to '%s' timed out\n",
"connect for $u timed out");
break;
case RT_REFUSED:
conn_err (au, 1, 2, 3,
"connect to '%s' failed\n",
"connect for $u failed");
break;
}
} else {
if (cur_tv.tv_sec < au->timeout)
goto c_ur;
if (errm(au->url, "data fetch for $u timed out") == RT_RETRY) {
gorequ:
queue_url(au->url);
}
gofrnu:
if (au->url->parm->opt->delete_broken)
unlink(au->disposition);
}
/* fake -- err: correct our statistics. ;-) */
if (au->http_done_header && au->size_total)
total_bytes += -au->size_total + au->size_fetched;
lnq_remove(queue_urls_reply, au);
num_urls_active--;
close(au->socket);
if (au->buffer)
free(au->buffer);
if (au->headers)
free(au->headers);
if (au->f != -1)
close(au->f);
free(au);
dbg(QUE, (" removed\n"));
continue;
}, c_ur);
checken("fetch_all (after iterating over replies)");
/* urls waiting for connection establishement */
lnq_iterate_rm(queue_urls_request, aurl_t, au, {
dbg(QUE, ("iterate_rm request http://%s/%s\n",
au->url->host->name, au->url->local_part));
if (FD_ISSET(au->socket, &wfds)) {
int err; int errl = sizeof(int);
dbg(QUE, (" connect event\n"));
getsockopt(au->socket, SOL_SOCKET, SO_ERROR,
(void *)&err, &errl);
if (err)
conn_err (au, 0, 2, 3, "connect to '%s' failed\n",
"connect for $u failed");
else if (send_http_get(au) <= 0)
conn_err (au, 0, 2, 3, "HTTP request send to '%s' failed\n",
"HTTP request send for $u failed");
else {
haddr_t *ip;
lnq_remove(queue_urls_request, au);
lnq_append(queue_urls_reply, au);
au->timeout = cur_tv.tv_sec +
au->url->parm->opt->timeout_data;
if (au->proxy)
ip = au->proxy->host->info->ips + au->pipidx;
else
ip = au->url->host->info->ips + au->ipidx;
ip->last_errt = 0;
ip->attempt = 0;
continue;
}
} else {
if (cur_tv.tv_sec < au->timeout)
goto c_uq;
conn_err (au, 0, 1, au->url->parm->opt->timeout_connect,
"connect to '%s' timed out\n",
"connect for $u timed out");
}
lnq_remove(queue_urls_request, au);
num_urls_active--;
close(au->socket);
free(au);
continue;
}, c_uq);
checken("fetch_all (after iterating over connects)");
/* dns lookups waiting for completion */
lnq_iterate_rm(queue_dns_busy, dnsproc_t, pr, {
dbg(QUE, ("iterate_rm host %s\n", pr->whost->host->name));
if (FD_ISSET(pr->fd, &rfds)) {
whost_t *wh = pr->whost;
dbg(QUE, (" has data\n"));
if (!finish_lookup(pr)) {
lnq_remove(queue_dns_busy, pr);
num_dns_busy--;
reap_dnsproc(pr);
lq_prepend(queue_dns_lookup, wh);
continue;
}
checken("fetch_all (after finish_lookup)");
if (wh->host->info) {
nwo = wh->objq;
while ((wo = nwo) != 0) {
nwo = wo->next;
if (wo->url) {
if (wo->url->referer &&
(wo->url->host->info !=
wo->url->referer->host->info) &&
(wo->url->is_requisite ?
wo->url->parm->opt->follow_src :
wo->url->parm->opt->follow_href) <=
HOST_RECURSIVE)
{
prx(NFO, "not adding 'http://%s/%s' (different host)\n",
wo->url->host->name,
wo->url->local_part);
free_url(wo->url);
checken("fetch_all (after free_url (1))");
} else {
int hash;
if (find_url(wo->url->local_part,
strlen(wo->url->local_part),
wo->url->host->info,
wo->url->port, &hash))
{
free_url(wo->url);
checken("fetch_all (after free_url (2))");
} else {
wo->url->url_hash = hash;
add_url(wo->url);
checken("fetch_all (after add_url)");
}
}
}
if (wo->proxy) {
wo->proxy->ready = 1;
waiting_proxies--;
}
free(wo);
}
} else {
nwo = wh->objq;
while ((wo = nwo) != 0) {
nwo = wo->next;
if (wo->url) {
if ((wo->url->is_requisite ?
wo->url->parm->opt->follow_src :
wo->url->parm->opt->follow_href) <= HOST_RECURSIVE)
{
num_urls++;
num_urls_fail++;
}
free(wo->url);
}
if (wo->proxy) {
wo->proxy->ready = 1;
waiting_proxies--;
}
free(wo);
}
}
free(wh);
lnq_remove(queue_dns_busy, pr);
num_dns_busy--;
lnq_append(queue_dns_idle, pr);
num_dns_idle++;
pr->timeout = cur_tv.tv_sec + 60;
continue;
}
}, c_dr);
checken("fetch_all (after iterating over dns lookups)");
}
}
syntax highlighted by Code2HTML, v. 0.9.1