/* $Id: fetch.c,v 1.13 2004/02/13 16:09:29 ossi Exp $ *
 *
 * puf 0.9  Copyright (C) 2000-2004 by Oswald Buddenhagen <puf@ossi.cjb.net>
 * based on puf 0.1.x (C) 1999,2000 by Anders Gavare <gavare@hotmail.com>
 *
 * You may modify and distribute this code under the terms of the GPL.
 * There is NO WARRANTY of any kind. See COPYING for details.
 *
 * fetch.c - url fetch loop
 *
 */

#include "puf.h"


off_t max_bytes, fetched_bytes, total_bytes;
int max_dnss_active = DEFAULT_MAX_DNS_FORKS;
int max_urls_active = DEFAULT_MAX_ACTIVE;
int timeout_dns = DEFAULT_TIMEOUT_DNS;
int max_time;
int max_urls;
int num_urls;
int num_urls_done;
int num_urls_fail;
int num_errors;
int show_stat = 1;
int waiting_proxies;
int all_proxy_wait = 1;		/* unused (immutable) */
struct timeval cur_tv, throttle;
struct sockaddr_in bind_addr;
circular_queue(queue_urls_connect, wurl_t);	/*  ready to connect  */
linear_na_queue(queue_urls_request, aurl_t);	/*  started connect, waiting for write  */
linear_na_queue(queue_urls_reply, aurl_t);	/*  request sent, waiting for reply  */

/*  try to connect to a host. returns -1 on error, otherwise a file
    descriptor number is returned (socket number). */
static int 
tcp_connect(struct in_addr addr, u_short port, struct in_addr baddr)
{
    int s;
    struct sockaddr_in server_in;

    if ((s = socket(PF_INET, SOCK_STREAM, 0)) < 0 && 
	    (!free_fd() || (s = socket(PF_INET, SOCK_STREAM, 0)) < 0))
	return -1;

    if (baddr.s_addr) {
	bind_addr.sin_addr = baddr;
	bind(s, (struct sockaddr *)&bind_addr, sizeof(struct sockaddr));
    }

    fcntl(s, F_SETFL, fcntl(s, F_GETFL) | O_NONBLOCK);

    server_in.sin_family = AF_INET;
    server_in.sin_addr = addr;
    server_in.sin_port = htons(port);

    connect(s, (struct sockaddr *)&server_in, sizeof(server_in));

    return s;
}


static void 
conn_err (aurl_t *au, int dr, int errt, int errw, char *etww, char *etnw)
{
    haddr_t *ip;
    hinfo_t *hi;

    if (!dr && au->proxy) {
	hi = au->proxy->host->info;
	ip = hi->ips + au->pipidx;
    } else {
	hi = au->url->host->info;
	ip = hi->ips + au->ipidx;
    }

    if (au->url->parm->opt->fail_no_wait) {
	if (errm(au->url, etnw) == RT_RETRY)
	    queue_url(au->url);
    } else {
	prx(WRN, etww, hi->name);
	queue_url(au->url);
    }

    if (ip->last_errt != 3 && cur_tv.tv_sec >= ip->retry_time) {
	if (++ip->attempt >= (unsigned)au->url->parm->opt->max_attempts) {
	    ip->last_errt = 3;
	    prx(WRN, "giving up address '%s' for host '%s'.\n",
			inet_ntoa(ip->addr), hi->name);
	} else {
	    if (errt != ip->last_errt) {
		dbg(CON, (" setting new error type %d\n", errt));
		ip->last_errt = errt;
		ip->err_wait = errw;
	    }
	    dbg(CON, (" retrying in %d seconds\n", ip->err_wait));
	    ip->retry_time = cur_tv.tv_sec + ip->err_wait;
	    ip->err_wait *= 2;
	}
    }
}


static void 
fmt_time(char *d, int ti)
{
    if (ti == -1)
	strcpy(d, "??:??");
    else if (ti < 6000)
	sprintf(d, "%02d:%02d", ti / 60, ti % 60);
    else if (ti < 360000)
	sprintf(d, "%02dh%02d", ti / 3600, ti / 60 % 60);
    else if (ti < 8640000)
	sprintf(d, "%dd%02d", ti / 86400, ti / 3600 % 24);
    else
	strcpy(d, "> 99d");
}

int 
touch(aurl_t *au)
{
    struct utimbuf ut;
    
    if (!au->file_time || au->url->parm->opt->no_touch)
	return 0;
    ut.actime = ut.modtime = au->file_time;
    return utime(au->disposition, &ut);
}

static void 
cleanup(void)
{
    /*  kill off still running dns helpers  */
    lnq_iterate_rm(queue_dns_idle, dnsproc_t, pr, {
	dbg(QUE, ("iterate_rm idle dns helper\n"));
	lnq_remove(queue_dns_idle, pr);
	reap_dnsproc(pr);
	continue;
    }, c_fid);
}

void 
byebye(char *msg)
{
    lnq_iterate(queue_urls_reply, aurl_t, au, {
	touch(au);
    });
    cleanup();
    
    die(1, msg);
}

static void sigint() { byebye("interrupted!"); }
static void sigterm() { byebye("terminated!"); }
static void sigalrm() { byebye("time quota exceeded!"); }

/*  Fetch all urls in parallel:  */
void 
fetch_all()
{
    fd_set rfds, wfds;

    struct timeval last_tv, start_tv, next_tv, next_conn_tv, next_dpy_tv, to;
    time_t next_fork_time;
    long timediff, tottime;
    
    off_t last_fetched_bytes;
    int top_speed, avg_speed;

    int num_dns_busy, num_dns_idle;
    int num_urls_active;
    
    wobj_t *wo, *nwo;
    aurl_t *au;
    url_t *u;
    int i, mxfd, nfds;
    int *spds, spdi, spdn;

    /*  Initialize some data:  */
    gettimeofday(&last_tv, NULL);
    start_tv = last_tv;
    timerclear(&next_conn_tv);
    timerclear(&next_dpy_tv);
    next_fork_time = 0;

    last_fetched_bytes = 0;
    top_speed = avg_speed = 0;
    
    num_dns_busy = num_dns_idle = 0;
    num_urls_active = 0;

    /*  Status info:  */
    spdi = spdn = 0;
    if (show_stat) {
	if (!(spds = mmalloc(sizeof(int) * AVERAGING_TIMEFRAME)))
	    show_stat = 0;
	else {
	    memset(spds, 0, sizeof(int) * AVERAGING_TIMEFRAME);
	    printf("\n      URLs             Connections         Bytes            Time       Kbyte/s\n"
		     "  done+ fail/ total   errs cur/max       done/total       pass left    cur/avg\n");
	}
    } else
	spds = 0;

    signal(SIGINT, sigint);
    signal(SIGTERM, sigterm);
    signal(SIGALRM, sigalrm);
    signal(SIGPIPE, SIG_IGN);

    alarm(max_time);

    /*  Megaloop:  */
    for (;;) {
	gettimeofday(&cur_tv, NULL);
	
	checken("fetch_all (top)");
	FD_ZERO(&rfds);
	FD_ZERO(&wfds);
	mxfd = -1;
	nfds = 0;

	dbg(QUE, ("---\n"));

	/*  urls waiting for initiation  */
	if (timercmp(&cur_tv, &next_conn_tv, <))
	    nfds++;	/* so we don't cancel out */
	else
	cq_consume(queue_urls_connect, wurl_t, wu, {
	    int rt;

	    dbg(QUE, ("consume http://%s/%s\n", wu->url->host->name, wu->url->local_part));
	    nfds++;
	    if (num_urls_active >= max_urls_active)
		break;
	    if (max_urls && num_urls_done > max_urls)
		byebye("URL count quota exceeded!");	/* XXX this is "somewhat" harsh */
	    u = wu->url;
	    rt = activate_url(u, &au);
	    if (rt == RT_AGAIN) {	/*  transient server problem  */
		queue_urls_connect = wu;
		continue;
	    } else if (rt == RT_RETRY)	/*  transient error  */
		break;
	    else if (rt == RT_SKIP)	/*  already exists  */
		num_urls--;
	    else if (rt == RT_OK) {
		struct in_addr addr;
		struct in_addr baddr;
		u_short port;

		if (au->proxy) {
		    addr = au->proxy->host->info->ips[au->pipidx].addr;
		    port = au->proxy->port;
		} else {
		    addr = u->host->info->ips[au->ipidx].addr;
		    port = u->port;
		}
		if (au->url->parm->opt->bind_addrs.nents) {
		    baddr = 
		      *((struct in_addr **)au->url->parm->opt->bind_addrs.ents)
			[RND(au->url->parm->opt->bind_addrs.nents)];
		    dbg(CON, ("connecting to '%s' %s:%i from %d.%d.%d.%d ... ",
			au->proxy ? au->proxy->host->name : u->host->name, 
			inet_ntoa(addr), port, 
			baddr.s_addr & 255, (baddr.s_addr >> 8) & 255, 
			(baddr.s_addr >> 16) & 255, baddr.s_addr >> 24));
		} else {
		    baddr.s_addr = 0;
		    dbg(CON, ("connecting to '%s' %s:%i ... ",
			au->proxy ? au->proxy->host->name : u->host->name, 
			inet_ntoa(addr), port));
		}
		if ((au->socket = tcp_connect(addr, port, baddr)) < 0) {
		    dbge(CON, ("failed!\n"));
		    free(au);
		    if (!num_urls_active)
			die(1, "tcp_connect() keeps failing.");
		    prx(ERR, "tcp_connect() failed!\n");
		    break;
		} else {
		    dbge(CON, ("ok\n"));
		    if (au->proxy)
			au->proxy->score++;
		    au->timeout = cur_tv.tv_sec + 
				  au->url->parm->opt->timeout_connect;
		    lnq_append(queue_urls_request, au);
		    num_urls_active++;
		    if (timerisset(&throttle)) {
			timeradd(&cur_tv, &throttle, &next_conn_tv);
			cq_rm1st(queue_urls_connect);
			free(wu);
			break;
		    }
		}
	    } else if (rt != RT_GIVEUP)	/*  permanent error  */
		dbg(CON, ("unknown return code %d from activate_url\n", rt));
	    cq_rm1st(queue_urls_connect);
	    free(wu);
	});
	checken("fetch_all (after consume_urls)");

	/*  dns lookups waiting for initiation  */
	lq_consume (queue_dns_lookup, whost_t, wh, {
	    dnsproc_t *pr;
	    dbg(QUE, ("consume host %s\n", wh->host->name));
	    nfds++;
	    if (num_dns_busy >= max_dnss_active)
		break;
	  whrt:
	    if ((pr = queue_dns_idle)) {
		queue_dns_idle = pr->next;
		num_dns_idle--;
	    } else {
		if (cur_tv.tv_sec < next_fork_time)
		    break;
		if (!(pr = fork_dnsproc())) {
		    next_fork_time = cur_tv.tv_sec + 1;
		    break;
		}
	    }
	    pr->whost = wh;
	    if (!start_lookup(pr)) {
		reap_dnsproc(pr);
		goto whrt;
	    }
	    lq_rm1st(queue_dns_lookup);
	    lnq_append(queue_dns_busy, pr);
	    num_dns_busy++;
	});
	checken("fetch_all (after consume_hosts)");

	/*  idle dns helpers  */
	lnq_iterate_rm(queue_dns_idle, dnsproc_t, pr, {
	    dbg(QUE, ("iterate_rm idle dns helper\n"));
	    if (pr->timeout < cur_tv.tv_sec) {
		dbg(QUE, ("  timeout\n"));
		lnq_remove(queue_dns_idle, pr);
		num_dns_idle--;
		reap_dnsproc(pr);
		continue;
	    }
	}, c_id);
	checken("fetch_all (after iterating over idle dns helpers)");

	/*  urls waiting for reply */
	lnq_iterate(queue_urls_reply, aurl_t, au, {
	    nfds++;
	    FD_SET(au->socket, &rfds);
	    if (au->socket > mxfd)
		mxfd = au->socket;
	});

	/*  urls waiting for connection establishement */
	lnq_iterate(queue_urls_request, aurl_t, au, {
	    nfds++;
	    FD_SET(au->socket, &wfds);
	    if (au->socket > mxfd)
		mxfd = au->socket;
	});

	/*  dns lookpus waiting for completion  */
	lnq_iterate(queue_dns_busy, dnsproc_t, pr, {
	    nfds++;
	    FD_SET(pr->fd, &rfds);
	    if (pr->fd > mxfd)
		mxfd = pr->fd;
	});

	if (show_stat) {
	    int esttimeleft, cur_speed, mid_speed;
	    char estts[10], totts[10];

	    timediff = (cur_tv.tv_sec - last_tv.tv_sec) * 100
		       + (cur_tv.tv_usec - last_tv.tv_usec) / 10000;

	    if (timediff >= 100 || !nfds) {
		cur_speed = timediff ?
			    (int)((fetched_bytes - last_fetched_bytes) * 100 /
			    timediff) : 0;

		spds[spdi] = cur_speed;
		if (++spdi >= AVERAGING_TIMEFRAME)
		    spdi = 0;
		if (spdn < AVERAGING_TIMEFRAME)
		    spdn++;

		for (mid_speed = 0, i = 0; i < spdn; i++)
		    mid_speed += spds[i];
		mid_speed /= spdn;
		if (mid_speed) {
		    esttimeleft = (total_bytes - fetched_bytes) / mid_speed;
		} else
		    esttimeleft = -1;

		if (cur_speed > top_speed)
		    top_speed = cur_speed;

		tottime = ((cur_tv.tv_sec - start_tv.tv_sec) * 100
		    + (cur_tv.tv_usec - start_tv.tv_usec) / 10000) / 100;
		avg_speed = tottime ? (int)(fetched_bytes / tottime) : 0;

		fmt_time(totts, tottime);
		fmt_time(estts, esttimeleft);

		printf("\r%6d+%5d/%6d %6d %3d/%-3d %10"SSOFFT"/%-10"SSOFFT
		       " %5s %5s %5d/%-4d",
		     num_urls_done, num_urls_fail, num_urls, num_errors,
		     num_urls_active, max_urls_active, fetched_bytes,
		     total_bytes, totts, estts,
		     cur_speed / 1024, avg_speed / 1024);
		fflush(stdout);

		last_fetched_bytes = fetched_bytes;
		last_tv = cur_tv;
		next_dpy_tv = cur_tv;
		next_dpy_tv.tv_sec++;
	    }
	}
	checken("fetch_all (after stats)");

	if (!nfds) {
	    if (show_stat) {
		printf("\n\nTop speed:     %9i bytes/second\n"
		           "Average speed: %9i bytes/second\n", 
		       top_speed, avg_speed);
		free(spds);
	    }
	    cleanup();
	    return;
	}

	next_tv = cur_tv;
	next_tv.tv_sec++; /* needed for other timeouts */
	if (timerisset(&next_conn_tv) && timercmp(&next_conn_tv, &next_tv, <))
	    next_tv = next_conn_tv;
	if (timerisset(&next_dpy_tv) && timercmp(&next_dpy_tv, &next_tv, <))
	    next_tv = next_dpy_tv;
	timersub( &next_tv, &cur_tv, &to);
	if (select(mxfd + 1, &rfds, &wfds, 0, &to) < 0)
	    die(1, "select() failed!");

	/*  urls waiting for reply */
	lnq_iterate_rm(queue_urls_reply, aurl_t, au, {
	    dbg(QUE, ("iterate_rm reply http://%s/%s\n", 
		      au->url->host->name, au->url->local_part));
	    if (FD_ISSET(au->socket, &rfds)) {
		dbg(QUE, ("  has data\n"));
		switch (handle_reply(au)) {
		    case RT_OK:
			au->timeout = cur_tv.tv_sec + 
				      au->url->parm->opt->timeout_data;
			goto c_ur;
		    case RT_SKIP:
			num_urls--;
			goto gofrnu;
		    case RT_DONE:
			num_urls_done++;
			break;
		    case RT_AGAIN:
			queue_url(au->url);
			break;
		    case RT_RETRY:
			goto gorequ;
		    case RT_GIVEUP:
			break;
		    case RT_TIMEOUT:
			conn_err (au, 1, 1, au->url->parm->opt->timeout_connect, 
				  "connect to '%s' timed out\n",
				  "connect for $u timed out");
			break;
		    case RT_REFUSED:
			conn_err (au, 1, 2, 3, 
				  "connect to '%s' failed\n", 
				  "connect for $u failed");
			break;
		}
	    } else {
		if (cur_tv.tv_sec < au->timeout)
		    goto c_ur;
		if (errm(au->url, "data fetch for $u timed out") == RT_RETRY) {
		  gorequ:
		    queue_url(au->url);
		}
	      gofrnu:
		if (au->url->parm->opt->delete_broken)
		    unlink(au->disposition);
	    }

	    /*  fake -- err: correct our statistics. ;-) */
	    if (au->http_done_header && au->size_total)
		total_bytes += -au->size_total + au->size_fetched;

	    lnq_remove(queue_urls_reply, au);
	    num_urls_active--;
	    close(au->socket);
	    if (au->buffer)
		free(au->buffer);
	    if (au->headers)
		free(au->headers);
	    if (au->f != -1)
		close(au->f);
	    free(au);
	    dbg(QUE, ("  removed\n"));
	    continue;
	}, c_ur);
	checken("fetch_all (after iterating over replies)");

	/*  urls waiting for connection establishement */
	lnq_iterate_rm(queue_urls_request, aurl_t, au, {
	    dbg(QUE, ("iterate_rm request http://%s/%s\n", 
		      au->url->host->name, au->url->local_part));
	    if (FD_ISSET(au->socket, &wfds)) {
		int err; int errl = sizeof(int);

		dbg(QUE, ("  connect event\n"));
		getsockopt(au->socket, SOL_SOCKET, SO_ERROR, 
			   (void *)&err, &errl);
		if (err)
		    conn_err (au, 0, 2, 3, "connect to '%s' failed\n", 
			      "connect for $u failed");
		else if (send_http_get(au) <= 0)
		    conn_err (au, 0, 2, 3, "HTTP request send to '%s' failed\n",
			      "HTTP request send for $u failed");
		else {
		    haddr_t *ip;
		    lnq_remove(queue_urls_request, au);
		    lnq_append(queue_urls_reply, au);
		    au->timeout = cur_tv.tv_sec + 
				  au->url->parm->opt->timeout_data;
		    if (au->proxy)
			ip = au->proxy->host->info->ips + au->pipidx;
		    else
			ip = au->url->host->info->ips + au->ipidx;
		    ip->last_errt = 0;
		    ip->attempt = 0;
		    continue;
	    	}
	    } else {
		if (cur_tv.tv_sec < au->timeout)
	            goto c_uq;
		conn_err (au, 0, 1, au->url->parm->opt->timeout_connect, 
			  "connect to '%s' timed out\n",
			  "connect for $u timed out");
	    }

	    lnq_remove(queue_urls_request, au);
	    num_urls_active--;
	    close(au->socket);
	    free(au);
	    continue;
	}, c_uq);
	checken("fetch_all (after iterating over connects)");

	/*  dns lookups waiting for completion  */
	lnq_iterate_rm(queue_dns_busy, dnsproc_t, pr, {
	    dbg(QUE, ("iterate_rm host %s\n", pr->whost->host->name));
	    if (FD_ISSET(pr->fd, &rfds)) {
		whost_t *wh = pr->whost;
		dbg(QUE, ("  has data\n"));
		if (!finish_lookup(pr)) {
		    lnq_remove(queue_dns_busy, pr);
		    num_dns_busy--;
		    reap_dnsproc(pr);
		    lq_prepend(queue_dns_lookup, wh);
		    continue;
		}
		checken("fetch_all (after finish_lookup)");
		if (wh->host->info) {
		    nwo = wh->objq;
		    while ((wo = nwo) != 0) {
			nwo = wo->next;
			if (wo->url) {
			    if (wo->url->referer &&
				(wo->url->host->info !=
				  wo->url->referer->host->info) &&
				(wo->url->is_requisite ?
				 wo->url->parm->opt->follow_src :
				 wo->url->parm->opt->follow_href) <= 
				    HOST_RECURSIVE)
			    {
				prx(NFO, "not adding 'http://%s/%s' (different host)\n",
				    wo->url->host->name,
				    wo->url->local_part);
				free_url(wo->url);
				checken("fetch_all (after free_url (1))");
			    } else {
				int hash;
				if (find_url(wo->url->local_part, 
					     strlen(wo->url->local_part), 
					     wo->url->host->info, 
					     wo->url->port, &hash))
				{
				    free_url(wo->url);
				    checken("fetch_all (after free_url (2))");
				} else {
				    wo->url->url_hash = hash;
				    add_url(wo->url);
				    checken("fetch_all (after add_url)");
				}
			    }
			}
			if (wo->proxy) {
			    wo->proxy->ready = 1;
			    waiting_proxies--;
			}
		    	free(wo);
		    }
		} else {
		    nwo = wh->objq;
		    while ((wo = nwo) != 0) {
			nwo = wo->next;
			if (wo->url) {
			    if ((wo->url->is_requisite ?
				 wo->url->parm->opt->follow_src :
				 wo->url->parm->opt->follow_href) <= HOST_RECURSIVE)
			    {
				num_urls++;
				num_urls_fail++;
			    }
			    free(wo->url);
			}
			if (wo->proxy) {
			    wo->proxy->ready = 1;
			    waiting_proxies--;
			}
			free(wo);
		    }
		}
		free(wh);
		lnq_remove(queue_dns_busy, pr);
		num_dns_busy--;
		lnq_append(queue_dns_idle, pr);
		num_dns_idle++;
		pr->timeout = cur_tv.tv_sec + 60;
		continue;
	    }
	}, c_dr);
	checken("fetch_all (after iterating over dns lookups)");
    }
}


syntax highlighted by Code2HTML, v. 0.9.1