/* copyright 2001-2002 Alexander Malmberg */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include "NNTPServer.h" /* TODO: owner specifies modes? */ #include //#define DEBUG_SCHEDULER #ifdef DEBUG_SCHEDULER @interface NNTPServer (debug_scheduler) -(void) _updateDebugScheduler; @end #endif /* TODO: better error handling? */ static NSTimeInterval Now(void) { return [NSDate timeIntervalSinceReferenceDate]; } //#define DEBUG_SOCKET /* based on a sample of 29314 messages from a few different (non-binary) groups */ #define ESTIMATED_MSG_SIZE 2183 /* TODO: for temporary xover stat collection */ //#define XOVER_STATS #ifdef XOVER_STATS static int stat_num_headers,stat_num_expected_headers,stat_header_bytes; #endif #define MAX_CONNECTIONS 2 typedef struct { int time_limit; /* open if work is above work_limit, close after */ int work_limit; /* workqe_size=8; q->qid=1; q->qe=malloc(sizeof(queue_entry_t)*q->qe_size); if (!q->qe) return NULL; q->qe[0].priority=10000; return q; } static void queue_free(queue_t *q) { free(q->qe); free(q); } /* return an estimated number of bytes returned */ static int queue_work(queue_entry_t *qe) { switch (qe->what) { case WHAT_NONE: return 0; case WHAT_DATE: return 10; case WHAT_GROUP_INFO: return 10; case WHAT_HEADER_ID: return 500; case WHAT_HEADER_RANGE: /* testing suggets 380 bytes/message */ return (qe->d.range.h-qe->d.range.l+1)*380; case WHAT_ARTICLE_ID: return qe->d.msg.bytes; case WHAT_ARTICLE_RANGE: return (qe->d.range.h-qe->d.range.l+1)*ESTIMATED_MSG_SIZE; case WHAT_GROUP_LIST: return 10000; /* will probably be a lot more, but could be a lot less */ case WHAT_POST_ARTICLE: return qe->d.post.length; } fprintf(stderr,"unhandled q->what %i in queue_work()\n",qe->what); abort(); return 1; /* shouldn't happen */ } static unsigned int queue_add(queue_t *q,signed short priority,queue_entry_t *qadd) { queue_entry_t *qe; int i; if (q->qe_num==q->qe_size-1) { queue_entry_t *e; int nsize; if (q->qe_size>1024) nsize=q->qe_size+1024; else nsize=q->qe_size*2; e=realloc(q->qe,nsize*sizeof(queue_entry_t)); if (!e) { /* TODO: very difficult to handle gracefully, but should never happen */ fprintf(stderr,"queue_add: out of memory, dropping\n"); abort(); // return NULL; } q->qe=e; q->qe_size=nsize; } q->qe_num++; for (i=q->qe_num;q->qe[i/2].priorityqe[i]=q->qe[i/2]; qe=&q->qe[i]; *qe=*qadd; qe->priority=priority; qe->qid=q->qid++; q->total_work+=queue_work(qe); return qe->qid; } #define QE_LESS(q1,q2) ( ((q2)->priority>(q1)->priority) || ( ((q1)->priority==(q2)->priority) && ((q2)->qid<(q1)->qid) ) ) static void queue_remove(queue_t *q,queue_entry_t *qe) { queue_entry_t *ins; int pri; int i,j; pri=qe->priority; i=qe-q->qe; if (i<1 || i>q->qe_num) return; q->total_work-=queue_work(qe); ins=&q->qe[q->qe_num]; q->qe_num--; qe=ins; if (qe->priority==pri) { q->qe[i]=*qe; return; } if (qe->priorityqe_num;i=j) { j=i*2; if (j+1<=q->qe_num && QE_LESS(&q->qe[j],&q->qe[j+1])) j++; if (QE_LESS(qe,&q->qe[j])) q->qe[i]=q->qe[j]; else break; } q->qe[i]=*qe; } else { for (;q->qe[i/2].prioritypriority;i/=2) q->qe[i]=q->qe[i/2]; q->qe[i]=*qe; } } static queue_entry_t *queue_get(queue_t *q) { if (!q->qe_num) return NULL; return &q->qe[1]; } static void queue_delete_max(queue_t *q) { int i,j; queue_entry_t *qe; q->total_work-=queue_work(&q->qe[1]); qe=&q->qe[q->qe_num--]; for (i=1;i*2<=q->qe_num;i=j) { j=i*2; if (j+1<=q->qe_num && QE_LESS(&q->qe[j],&q->qe[j+1])) j++; if (QE_LESS(qe,&q->qe[j])) q->qe[i]=q->qe[j]; else break; } q->qe[i]=*qe; } #undef QE_LESS static void queue_entry_clear(queue_entry_t *q) { if (q->what==WHAT_HEADER_ID || q->what==WHAT_ARTICLE_ID ) if (q->d.msg.id) { free(q->d.msg.id); q->d.msg.id=NULL; } if (q->what==WHAT_POST_ARTICLE) if (q->d.post.data) { free(q->d.post.data); q->d.post.data=NULL; } q->what=WHAT_NONE; } typedef enum { S_NONE, S_IDLE, S_CONNECT1, /* connected, waiting for hello message */ /* send 'mode reader' */ S_CONNECT2, /* waiting for 'mode reader' response */ S_QUIT, /* waiting for 205 response */ S_DATE, /* waiting for DATE response */ S_GROUP, /* waiting for GROUP response for group info */ S_SWITCH_GROUP, /* waiting for GROUP response while switching groups */ /* for some other command */ S_HEADER_RANGE1, S_HEADER_RANGE2, S_HEADER_ID1, S_HEADER_ID2, S_ARTICLE_RANGE1, S_ARTICLE_RANGE2, S_ARTICLE_ID1, S_ARTICLE_ID2, S_GROUP_LIST1, S_GROUP_LIST2, S_POST_ARTICLE1, S_POST_ARTICLE2, } state_t; typedef struct nntp_connection_s { int valid; int sock; char *write_pending; int write_pending_ofs; int write_len; /* 0=no response yet 1=have status 2=have status, waiting for data 3=have status and data */ int resp; int resp_code; /* numeric code in status */ char *resp_msg; /* status message (if any) */ unsigned char *resp_buf; /* read buffer */ int resp_buf_len; /* length of buffer */ int resp_data_len; /* data has been processed up to this point */ int resp_data_nline; int resp_data_len_last; /* used to keep user updated on long responses */ state_t state; char should_close,reg_write; queue_entry_t cur; int cur_group; #ifdef DEBUG_SOCKET FILE *debug_write; #endif } con_t; static char **group_names; static int num_group_names; @interface NNTPServer (private) -(void) updateQueue; -setup; -(int) nc_connect: (con_t *)c; -(void) nc_update: (con_t *)c; -(void) nc_setState: (con_t *)c : (state_t)nstate; -(void) nc_close: (con_t *)c; -(void) nc_free: (con_t *)c; -(void) nc_parseResponse: (con_t *)c; -(void) nc_updateRead: (con_t *)c; -(void) nc_updateWrite: (con_t *)c; -(void) nc_writeCommand: (con_t *)c : (const char *)cmd, ...; -(void) nc_writeData: (con_t *)c : (unsigned char *)data : (int)length; -(void) nc_responseClear: (con_t *)c; -(void) nc_responseWantData: (con_t *)c; -(void) nc_updateState: (con_t *)c; -(void) nc_handleWhat: (con_t *)c; -(void) nc_fail: (con_t *)c; -(void) nc_timeout; @end @implementation NNTPServer (private) -setup { should_connect=0; runloop=[[NSRunLoop currentRunLoop] retain]; runmodes=[[NSArray alloc] initWithObjects: NSDefaultRunLoopMode, NSModalPanelRunLoopMode, NSEventTrackingRunLoopMode, nil]; queue=queue_new(); if (!queue) [NSException raise: @"NNTPServer_Error" format: @"out of memory"]; num_cons=MAX_CONNECTIONS; cons=malloc(sizeof(con_t)*num_cons); if (!cons) [NSException raise: @"NNTPServer_Error" format: @"out of memory"]; memset(cons,0,sizeof(con_t)*num_cons); lca_delta=-1.0; last_connect_attempt=Now(); // last_activity=Now(); return self; } -(int) fixAddr { if (!have_addr) { struct hostent *h; unsigned char *a; h=gethostbyname(host); if (!h) { DESTROY(last_error); last_error=[[NSString stringWithFormat: _(@"lookup of %s failed: %s"),host,strerror(errno)] retain]; return 0; } addr.sin_addr=*(struct in_addr *)h->h_addr; a=(unsigned char *)h->h_addr; [rec nntp_message: [NSString stringWithFormat: _(@"Resolved '%s' to %i.%i.%i.%i"), host,a[0],a[1],a[2],a[3]]]; have_addr=1; } if (port==-1) { struct servent *s; struct protoent *p; s=getservbyname("nntp","tcp"); if (!s) { [rec nntp_message: [NSString stringWithFormat: _(@"warning: can't find service 'nntp', assuming port 119")]]; port=119; p=getprotobyname("tcp"); } else { port=ntohs(s->s_port); p=getprotobyname(s->s_proto); } if (!p) { [rec nntp_message: [NSString stringWithFormat: _(@"warning: can't find protocol, assuming 0")]]; protocol=0; } else protocol=p->p_proto; } if (protocol==-1) { struct protoent *p; p=getprotobyname("tcp"); if (!p) { [rec nntp_message: [NSString stringWithFormat: _(@"warning: can't find protocol, assuming 0")]]; protocol=0; } else protocol=p->p_proto; } return 1; } -(int) nc_connect: (con_t *)c { if (![self fixAddr]) return 0; addr.sin_family=AF_INET; addr.sin_port=htons(port); #ifdef DEBUG_SOCKET { char buf[128]; sprintf(buf,"ncon%i.txt",c-cons); c->debug_write=fopen(buf,"at"); } #endif { unsigned char *a=(unsigned char *)&addr.sin_addr; [rec nntp_message: [NSString stringWithFormat: _(@"Connecting to %i.%i.%i.%i:%i..."), a[0],a[1],a[2],a[3],port]]; #ifdef DEBUG_SOCKET fprintf(c->debug_write,"-=- Connecting to %i.%i.%i.%i:%i\n",a[0],a[1],a[2],a[3],port);fflush(c->debug_write); #endif } c->sock=socket(PF_INET,SOCK_STREAM,protocol); if (c->sock<0) { DESTROY(last_error); last_error=[[NSString stringWithFormat: _(@"can't create socket: %s"),strerror(errno)] retain]; return 0; } if (connect(c->sock,(struct sockaddr *)&addr,sizeof(addr))<0) { DESTROY(last_error); last_error=[[NSString stringWithFormat: _(@"Can't open connection: %s"),strerror(errno)] retain]; close(c->sock); c->sock=-1; return 0; } { int flags; flags=fcntl(c->sock,F_GETFL); flags|=O_NONBLOCK; if (fcntl(c->sock,F_SETFL,flags)) { DESTROY(last_error); last_error=[[NSString stringWithFormat: _(@"fcntl failed to set non-blocking mode: %s"),strerror(errno)] retain]; close(c->sock); c->sock=-1; return 0; } } c->valid=1; { int i,count=[runmodes count]; for (i=0;isock type: ET_RDESC watcher: self forMode: [runmodes objectAtIndex: i]]; [runloop addEvent: (void *)c->sock type: ET_EDESC watcher: self forMode: [runmodes objectAtIndex: i]]; } } [self nc_setState: c : S_CONNECT1]; return 1; } -(void) nc_update: (con_t *)c { int i,count; if (c->state==S_NONE) return; count=[runmodes count]; if (c->reg_write && !c->write_pending && c->sock>=0) { for (i=0;isock type: ET_WDESC forMode: [runmodes objectAtIndex: i] all: YES]; c->reg_write=0; } else if (!c->reg_write && c->write_pending && c->sock>=0) { for (i=0;isock type: ET_WDESC watcher: self forMode: [runmodes objectAtIndex: i]]; c->reg_write=1; } } -(void) nc_setState: (con_t *)c : (state_t)nstate { if (c->state==nstate) return; if (c->state==S_CONNECT1 || c->state==S_CONNECT2) num_starting--; if (c->state==S_IDLE) num_idle--; if (c->state==S_NONE) num_active++; if (nstate==S_CONNECT1 || nstate==S_CONNECT2) num_starting++; if (nstate==S_IDLE) num_idle++; if (nstate==S_NONE) num_active--; c->state=nstate; } -(void) nc_close: (con_t *)c { if (c->state==S_NONE) return; c->should_close=1; if (set_timeout==1) { set_timeout=0; [NSObject cancelPreviousPerformRequestsWithTarget: self selector: @selector(nc_timeout) object: [NSObject class]]; } } -(void) nc_free: (con_t *)c { #ifdef DEBUG_SOCKET if (c->debug_write) { fclose(c->debug_write); c->debug_write=NULL; } #endif if (set_timeout==1) { set_timeout=0; [NSObject cancelPreviousPerformRequestsWithTarget: self selector: @selector(nc_timeout) object: [NSObject class]]; } queue_entry_clear(&c->cur); { int i,count=[runmodes count]; for (i=0;isock type: ET_RDESC forMode: [runmodes objectAtIndex: i] all: YES]; [runloop removeEvent: (void *)c->sock type: ET_WDESC forMode: [runmodes objectAtIndex: i] all: YES]; [runloop removeEvent: (void *)c->sock type: ET_EDESC forMode: [runmodes objectAtIndex: i] all: YES]; } } if (c->write_pending) { free(c->write_pending); c->write_pending=NULL; } if (c->resp_msg) { free(c->resp_msg); c->resp_msg=NULL; } if (c->resp_buf) { free(c->resp_buf); c->resp_buf=NULL; c->resp_buf_len=0; } if (c->sock>=0 && c->valid) { close(c->sock); } [self nc_setState: c : S_NONE]; memset(c,0,sizeof(con_t)); c->sock=-1; } -(void) nc_parseResponse: (con_t *)cn { if (!cn->resp_buf_len) return; if (cn->resp==0) { /* look for status line */ unsigned char *b,*c,*e; e=cn->resp_buf+cn->resp_buf_len; for (c=cn->resp_buf;cresp_buf+3; if (*b==' ') b++; if (c>b) { cn->resp_msg=malloc(c-b+1); if (!cn->resp_msg) [NSException raise: @"NNTPServer_Error" format: @"out of memory"]; memcpy(cn->resp_msg,b,c-b); cn->resp_msg[c-b]=0; } cn->resp_code=atoi(cn->resp_buf); c+=2; if (c==e) cn->resp_buf_len=0; else { cn->resp_buf_len-=(c-cn->resp_buf); memmove(cn->resp_buf,c,cn->resp_buf_len); } /* status lines are probably short, so don't bother reallocating the buffer (we'd just end up allocating just as much on the next status line) */ cn->resp=1; } else if (cn->resp==2) { /* look for data ended by ".\r\n" alone on a line */ unsigned char *c,*d,*e; c=d=cn->resp_buf+cn->resp_data_len; e=cn->resp_buf+cn->resp_buf_len; for (;cresp_data_nline && *c=='.') { if (c>e-3) break; if (c[1]=='\r' && c[2]=='\n') { /* we have all the data */ cn->resp=3; break; } cn->resp_data_nline=0; continue; /* skip the first period */ } cn->resp_data_nline=0; if (cn->resp_data_len>0 && *c=='\n' && d[-1]=='\r') { d[-1]='\n'; cn->resp_data_nline=1; continue; } *d++=*c; cn->resp_data_len++; } if (c==e) { /* we ran out of data before finding the end resp_buf: | resp_data_len | */ cn->resp_buf_len=cn->resp_data_len; } else if (cn->resp==3) { /* we found the end, remove the end marker and fix the c/d gap resp_buf: | resp_data_len stuff(resp_buf_len-resp_data_len) | */ c+=3; if (c!=e) memmove(d,c,e-c); cn->resp_buf_len-=c-d; cn->resp_data_len_last=0; } else { /* we've found a period but can't tell if it's end of data yet (yuck) resp_buf: | resp_data_len stuff(resp_buf_len-resp_data_len) */ memmove(d,c,e-c); cn->resp_buf_len-=c-d; } } if (cn->state==S_HEADER_RANGE2) { if (cn->resp_data_len>48*1024) { char **list; int num; unsigned char *b,*d,*e; num=0; /* TODO: this will be excessively large almost all the time */ list=malloc(sizeof(char *)*(cn->cur.d.range.h-cn->cur.d.range.l+1)); d=b=cn->resp_buf; /* always leave at least one byte so there will be a final non-partial call to nntp_headerRange:... */ e=b+cn->resp_data_len-1; for (;bresp_buf; #endif [rec nntp_headerRange: cn->cur.d.range.group : cn->cur.d.range.l : cn->cur.d.range.h : num : list partial: YES qid: cn->cur.qid]; free(list); num=d-cn->resp_buf; memmove(cn->resp_buf,d,cn->resp_buf_len-num); cn->resp_buf_len-=num; cn->resp_data_len-=num; } } else { int i,j; i=cn->resp_data_len/(20*1024); j=cn->resp_data_len_last/(20*1024); if (i>j) { [rec nntp_progress: cn->resp_data_len : cn->cur.qid]; cn->resp_data_len_last=cn->resp_data_len; } } } -(void) nc_responseClear: (con_t *)c { if (c->resp==0 || c->resp==2) [NSException raise: @"NNTPServer_Error" format: @"responseClear called when resp=%i",c->resp]; if (c->resp==3) { unsigned char *b; c->resp_buf_len-=c->resp_data_len; /* data might have been large, so shrink the buffer again */ if (c->resp_buf_len) { memmove(c->resp_buf,c->resp_buf+c->resp_data_len,c->resp_buf_len); b=realloc(c->resp_buf,c->resp_buf_len); if (b) c->resp_buf=b; } else { free(c->resp_buf); c->resp_buf=NULL; } c->resp_data_len=0; } c->resp=0; if (c->resp_msg) free(c->resp_msg); c->resp_msg=NULL; [self nc_parseResponse: c]; } -(void) nc_responseWantData: (con_t *)c { if (c->resp!=1) [NSException raise: @"NNTPServer_Error" format: @"responseWantData called when resp=%i",c->resp]; c->resp=2; c->resp_data_len=0; c->resp_data_nline=0; [self nc_parseResponse: c]; } -(void) nc_updateRead: (con_t *)c { unsigned char buf[512]; int len,ilen; unsigned char *b; /* if the socket has already been closed we're just parsing data we've already recieved */ ilen=0; if (c->sock!=-1) { while ((len=read(c->sock,buf,sizeof(buf)))>0) { b=realloc(c->resp_buf,c->resp_buf_len+len); if (!b) { c->valid=0; /* TODO */ [NSException raise: @"NNTPServer_Error" format: @"out of memory"]; } c->resp_buf=b; b+=c->resp_buf_len; memcpy(b,buf,len); c->resp_buf_len+=len; ilen+=len; #ifdef DEBUG_SOCKET { unsigned char b2[513]; memcpy(b2,buf,512); b2[len]=0; fprintf(c->debug_write,"%s",b2);fflush(c->debug_write); } #endif } if (len<0 && errno!=EAGAIN) { fprintf(stderr,"error reading from socket %i: %m\n",c->sock); c->valid=0; return; } if (len==0) { int i,count=[runmodes count]; close(c->sock); for (i=0;isock type: ET_RDESC forMode: [runmodes objectAtIndex: i] all: YES]; [runloop removeEvent: (void *)c->sock type: ET_EDESC forMode: [runmodes objectAtIndex: i] all: YES]; [runloop removeEvent: (void *)c->sock type: ET_WDESC forMode: [runmodes objectAtIndex: i] all: YES]; } c->sock=-1; } if (ilen!=0) [self nc_parseResponse: c]; } } -(void) nc_updateWrite: (con_t *)cn { const char *c; int len,wlen; if (!cn->write_pending) return; if (cn->sock==-1) { /* TODO */ free(cn->write_pending); cn->write_pending=NULL; cn->write_len=0; return; } c=cn->write_pending+cn->write_pending_ofs; len=cn->write_len-cn->write_pending_ofs; wlen=write(cn->sock,c,len); if (wlen<0) { if (errno==EAGAIN) return; fprintf(stderr,"error writing to socket %i: %m\n",cn->sock); cn->valid=0; return; } cn->write_pending_ofs+=wlen; if (wlen==len) { free(cn->write_pending); cn->write_pending=NULL; cn->write_len=0; } } -(void) nc_writeCommand: (con_t *)c : (const char *)cmd, ... { extern int vasprintf(char **c,const char *format,va_list args); /* TODO? */ va_list va; char *d; if (c->write_pending) { abort(); /* TODO? */ } if (!cmd) return; c->write_pending=NULL; va_start(va,cmd); vasprintf(&d,cmd,va); va_end(va); if (!d) [NSException raise: @"NNTPServer_Error" format: @"out of memory"]; c->write_len=strlen(d)+2; d=realloc(d,c->write_len+1); if (!d) [NSException raise: @"NNTPServer_Error" format: @"out of memory"]; c->write_pending=d; strcat(c->write_pending,"\r\n"); c->write_pending_ofs=0; #ifdef DEBUG_SOCKET fprintf(c->debug_write,"=C: %s",c->write_pending);fflush(c->debug_write); #endif [self nc_updateWrite: c]; } -(void) nc_writeData: (con_t *)c : (unsigned char *)data : (int)length { if (c->write_pending) { abort(); /* TODO? */ } c->write_pending=NULL; c->write_len=length; c->write_pending=data; c->write_pending_ofs=0; #ifdef DEBUG_SOCKET fprintf(c->debug_write,"=C DATA||\n"); fwrite(c->write_pending,1,c->write_len,c->debug_write); fprintf(c->debug_write,"||\n");fflush(c->debug_write); #endif [self nc_updateWrite: c]; } -(void) receivedEvent: (void *)data type: (RunLoopEventType) type extra: (void *)extra forMode: (NSString *)mode { CREATE_AUTORELEASE_POOL(arp); con_t *c; int i,j; // printf("--- got event %i type %i extra %p\n",(int)data,type,extra); j=(int)data; for (i=0,c=cons;isock==j) break; if (i==num_cons) { fprintf(stderr,"[NNTPServer receivedEvent] can't find connection for socket\n"); DESTROY(arp); return; } if (type==ET_EDESC) { /* probably eof */ [self nc_updateRead: c]; [self nc_updateState: c]; } else if (type==ET_WDESC) { [self nc_updateWrite: c]; } else if (type==ET_RDESC) { [self nc_updateRead: c]; [self nc_updateState: c]; } [self updateQueue]; [self nc_update: c]; // printf("--- done received\n"); #ifdef DEBUG_AUTORELEASE_COUNT fprintf(stderr,"receivedEvent arp contains %i objects\n",[arp autoreleaseCount]); #endif DESTROY(arp); } -(NSDate *) timedOutEvent: (void *)data type: (RunLoopEventType) type forMode: (NSString *)mode { /* shouldn't happen */ fprintf(stderr,"got timedOutEvent\n"); return nil; } -(void) nc_fail: (con_t *)c { [rec nntp_fail: [NSString stringWithFormat: @"Unexpected reply: %i %s",c->resp_code,c->resp_msg] qid: c->cur.qid]; } -(int) nc_switchGroup: (con_t *)c : (int)group { if (group==c->cur_group) return 0; [self nc_setState: c : S_SWITCH_GROUP]; [self nc_writeCommand: c : "GROUP %s",group_names[group]]; return 1; } -(void) nc_updateState: (con_t *)c { int code; queue_entry_t *q; while (c->resp==1 || c->resp==3) { code=c->resp_code; q=&c->cur; switch (c->state) { case S_NONE: [rec nntp_message: @"Warning: got response in state S_NONE"]; [self nc_responseClear: c]; break; case S_IDLE: fprintf(stderr,"TODO handle S_IDLE %i '%s'\n",code,c->resp_msg); [self nc_close: c]; break; case S_QUIT: [rec nntp_message: [NSString stringWithFormat: _(@"Closing connection: %i %s"), code,c->resp_msg]]; [self nc_free: c]; break; case S_CONNECT1: [rec nntp_message: [NSString stringWithFormat: _(@"New connection (%i total): %i %s"),num_active,code,c->resp_msg]]; [self nc_responseClear: c]; [self nc_writeCommand: c : "MODE READER"]; [self nc_setState: c : S_CONNECT2]; break; case S_CONNECT2: if (/*code!=500 && */code!=200 && code!=201) { if (num_active==1) { should_connect=0; /* TODO: think hard about how this should be handled */ [rec nntp_fail_connect: [NSString stringWithFormat: _(@"Unexpected response when connecting: %i %s"),code,c->resp_msg]]; } else { [rec nntp_message: [NSString stringWithFormat: _(@"Unexpected response when connecting: %i %s"),code,c->resp_msg]]; } [self nc_close: c]; break; } [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; lca_delta=-1.0; break; case S_DATE: if (code==111) [rec nntp_serverDate: c->resp_msg qid: q->qid]; else [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); break; case S_GROUP: if (code==211) { /* 211 n f l s group selected */ int num,first,last; c->cur_group=q->d.group; if (sscanf(c->resp_msg,"%i %i %i",&num,&first,&last)==3) [rec nntp_groupInfo: q->d.group : num : first : last qid: q->qid]; else [self nc_fail: c]; } else if (code==411) { /* 411 no such news group */ [rec nntp_groupInfo: q->d.group : -1 : -1 : -1 qid: q->qid]; } else [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); break; case S_SWITCH_GROUP: if (code==211) { c->cur_group=q->d.group; [self nc_responseClear: c]; [self nc_handleWhat: c]; } else { [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } break; case S_HEADER_RANGE1: if (code==224) { [self nc_responseWantData: c]; [self nc_setState: c : S_HEADER_RANGE2]; } else { [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } break; case S_HEADER_RANGE2: { char **list; int num; char *b,*d,*e; num=0; list=malloc(sizeof(char *)*(q->d.range.h-q->d.range.l+1)); d=b=c->resp_buf; e=b+c->resp_data_len; for (;bd.range.h-q->d.range.l+1; stat_header_bytes+=c->resp_data_len; #endif if (num) { [rec nntp_headerRange: q->d.range.group : q->d.range.l : q->d.range.h : num : list partial: NO qid: q->qid]; } free(list); [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); break; } case S_HEADER_ID1: if (code==221) { [self nc_responseWantData: c]; [self nc_setState: c : S_HEADER_ID2]; } else { [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } break; case S_HEADER_ID2: [rec nntp_headerId: q->d.msg.id data: c->resp_buf : c->resp_data_len qid: q->qid]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); break; case S_ARTICLE_RANGE1: if (code==220) { [self nc_responseWantData: c]; [self nc_setState: c : S_ARTICLE_RANGE2]; } else if (code==423) { [rec nntp_articleRange: q->d.range.group : q->d.range.l data: NULL:-1 qid: q->qid]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } else { [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } break; case S_ARTICLE_RANGE2: [rec nntp_articleRange: q->d.range.group : q->d.range.l data: c->resp_buf : c->resp_data_len qid: q->qid]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); break; case S_ARTICLE_ID1: if (code==220) { [self nc_responseWantData: c]; [self nc_setState: c : S_ARTICLE_ID2]; } else if (code==430) { [rec nntp_articleId: q->d.msg.id data: NULL:-1 qid: q->qid]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } else { [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } break; case S_ARTICLE_ID2: [rec nntp_articleId: q->d.msg.id data: c->resp_buf : c->resp_data_len qid: q->qid]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); break; case S_GROUP_LIST1: if (code==215) { [self nc_responseWantData: c]; [self nc_setState: c : S_GROUP_LIST2]; } else { [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } break; case S_GROUP_LIST2: [rec nntp_groupList: c->resp_buf : c->resp_data_len qid: q->qid]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); break; case S_POST_ARTICLE1: // fprintf(stderr,"POST_ARTICLE1: %3i %s\n",code,c->resp_msg); if (code!=340) { [rec nntp_postArticle: NO qid: q->qid]; [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } else { [self nc_responseClear: c]; [self nc_writeData: c : q->d.post.data : q->d.post.length]; q->d.post.data=NULL; q->d.post.length=0; [self nc_setState: c : S_POST_ARTICLE2]; } break; case S_POST_ARTICLE2: // fprintf(stderr,"POST_ARTICLE2: %3i %s\n",code,c->resp_msg); if (code==240) { [rec nntp_postArticle: YES qid: q->qid]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } else { [rec nntp_postArticle: NO qid: q->qid]; /* TODO: really do this here? */ [self nc_fail: c]; [self nc_responseClear: c]; [self nc_setState: c : S_IDLE]; queue_entry_clear(q); } break; default: fprintf(stderr,"unknown state %i\n",c->state); abort(); break; } } } -(void) nc_handleWhat: (con_t *)c { queue_entry_t *q=&c->cur; switch (q->what) { case WHAT_DATE: [self nc_setState: c : S_DATE]; [self nc_writeCommand: c : "DATE"]; break; case WHAT_GROUP_INFO: [self nc_setState: c : S_GROUP]; [self nc_writeCommand: c : "GROUP %s", group_names[q->d.group]]; break; case WHAT_HEADER_RANGE: if ([self nc_switchGroup: c : q->d.range.group]) break; [self nc_setState: c : S_HEADER_RANGE1]; [self nc_writeCommand: c : "XOVER %i-%i",q->d.range.l,q->d.range.h]; break; case WHAT_HEADER_ID: [self nc_setState: c : S_HEADER_ID1]; [self nc_writeCommand: c : "HEAD %s",q->d.msg.id]; break; case WHAT_ARTICLE_RANGE: if ([self nc_switchGroup: c : q->d.range.group]) break; [self nc_setState: c : S_ARTICLE_RANGE1]; [self nc_writeCommand: c : "ARTICLE %i",q->d.range.l]; break; case WHAT_ARTICLE_ID: [self nc_setState: c : S_ARTICLE_ID1]; [self nc_writeCommand: c : "ARTICLE %s",q->d.msg.id]; break; case WHAT_GROUP_LIST: [self nc_setState: c : S_GROUP_LIST1]; [self nc_writeCommand: c : "LIST"]; break; case WHAT_POST_ARTICLE: [self nc_setState: c : S_POST_ARTICLE1]; [self nc_writeCommand: c : "POST"]; break; default: fprintf(stderr,"unhandled q->what %i\n",q->what); break; } } -(void) nc_timeout { int i; con_t *c; // printf("timing out, closing one at %30.15g\n",Now()); for (i=0,c=cons;istate==S_IDLE) break; if (i==num_cons) { for (i=0,c=cons;istate!=S_NONE) break; } set_timeout=0; if (i!=num_cons) [self nc_close: c]; [self updateQueue]; } -(void) updateQueue { queue_entry_t *q; int i; int num_closing; int cur_work; int active; con_t *c; // fprintf(stderr," --- update: %i/%i/%i ---\n",num_idle,num_active,num_cons); for (c=cons,i=0;istate!=S_IDLE) continue; if (c->should_close) { [self nc_writeCommand: c : "QUIT"]; [self nc_setState: c : S_QUIT]; [self nc_update: c]; continue; } q=queue_get(queue); if (!q) continue; /* printf("%i in queue\n",queue->qe_num); printf("got pri %i what %i on %i/%i (p %i %i %i)\n", q->priority,q->what,i,c->sock, q->d.range.group,q->d.range.l,q->d.range.h);*/ c->cur=*q; if (q->what==WHAT_ARTICLE_RANGE) { c->cur.d.range.h=c->cur.d.range.l; q->d.range.l++; queue->total_work-=ESTIMATED_MSG_SIZE; /* TODO: yuck */ if (q->d.range.l>q->d.range.h) queue_delete_max(queue); } else { queue_delete_max(queue); } [self nc_handleWhat: c]; [self nc_update: c]; } num_closing=0; cur_work=0; for (i=0,c=cons;istate!=S_NONE && (c->should_close || c->state==S_QUIT)) num_closing++; if (c->state!=S_NONE && c->state!=S_IDLE && c->state!=S_QUIT) cur_work+=queue_work(&c->cur); } cur_work+=queue->total_work; if (num_active-num_closing>0) active=cur_work>work_limits[num_active-1-num_closing].work_limit; else active=!!cur_work; /* fprintf(stderr,"work=%i %i active=%i num_closing=%i\n",queue->total_work,cur_work,active,num_closing); fprintf(stderr,"%i %i %i %i\n",active,should_connect, Now()>lca_delta+last_connect_attempt,queue->total_work>work_limits[num_active].work_limit); fprintf(stderr,"update queue: approx. %i bytes left\n",queue->total_work);*/ if ( active && should_connect && num_activelca_delta+last_connect_attempt && cur_work>work_limits[num_active].work_limit && queue->qe_num>num_active ) { for (i=0,c=cons;istate==S_NONE) break; // printf("opening new\n"); last_connect_attempt=Now(); if ([self nc_connect: c]) { lca_delta=15.0; } else { if (!num_active) { should_connect=0; [rec nntp_fail_connect: last_error]; } else { if (!lca_delta) lca_delta=15.0; else if (lca_delta>90.0) lca_delta=90.0; else lca_delta+=15.0; } } } if (!active && (num_active-num_closing) && set_timeout==0) { // last_activity=Now(); [self performSelector: @selector(nc_timeout) withObject: [NSObject class] afterDelay: work_limits[num_active-1-num_closing].time_limit]; set_timeout=1; } else if (active && set_timeout==1) { set_timeout=0; [NSObject cancelPreviousPerformRequestsWithTarget: self selector: @selector(nc_timeout) object: [NSObject class]]; } // printf("done update\n"); #ifdef DEBUG_SCHEDULER [self _updateDebugScheduler]; #endif } @end @implementation NNTPServer +(int) getGroupNum: (const char *)group; { int i; char *c,**l; if (!group_names) { group_names=malloc(sizeof(char *)); if (!group_names) return -1; num_group_names=1; group_names[0]=NULL; } for (i=1;i=num_group_names) return NULL; return group_names[group]; } -init { return [self initWithHost: "localhost"]; } -initWithHost: (const char *)ahost { return [self initWithHost: ahost port: -1]; } -initWithHost: (const char *)ahost port: (int)aport { self=[super init]; if (!self) return nil; host=strdup(ahost); if (!host) [NSException raise: @"NNTPServer_Error" format: @"out of memory"]; port=aport; protocol=-1; have_addr=0; return [self setup]; } -initWithAddr: (struct sockaddr_in *)aaddr port: (int)aport host: (const char *)ahost { self=[super init]; if (!self) return nil; if (host) host=strdup(ahost); else host=strdup(""); if (!host) [NSException raise: @"NNTPServer_Error" format: @"out of memory"]; port=aport; protocol=-1; have_addr=1; addr=*aaddr; return [self setup]; } -(void) dealloc { #ifdef XOVER_STATS printf("got %i/%i headers, %i bytes, %g bytes/msg\n", stat_num_headers,stat_num_expected_headers,stat_header_bytes, stat_header_bytes/(double)stat_num_headers); #endif [self killAllConnections]; DESTROY(runloop); DESTROY(runmodes); DESTROY(last_error); free(host); free(cons); queue_free(queue); [super dealloc]; } -(void) closeAllConnections { int i; con_t *c; for (c=cons,i=0;i)arec; { rec=arec; } -(void) enableConnect: (int)c { should_connect=c; if (c) lca_delta=-1.0; [self updateQueue]; } -(void) enableTimeout: (int)t { if (t) { set_timeout=0; } else { if (set_timeout==1) { [NSObject cancelPreviousPerformRequestsWithTarget: self selector: @selector(nc_timeout) object: [NSObject class]]; } set_timeout=-1; } } -(unsigned int) queueAdd: (queue_entry_t *)q : (int)priority { unsigned int qid; qid=queue_add(queue,priority,q); [self updateQueue]; return qid; } -(BOOL) cancelQid: (unsigned int)qid kill: (BOOL)kill { int i; queue_entry_t *qe; for (qe=queue->qe+1,i=0;iqe_num;i++,qe++) { if (qe->qid==qid) { queue_remove(queue,qe); [self updateQueue]; return YES; } } if (kill) { fprintf(stderr,"[NNTPServer -cancelQid: kill: YES] not implemented!\n"); } return NO; } -(unsigned int) getServerDate { queue_entry_t q; q.what=WHAT_DATE; return [self queueAdd: &q : 0]; } -(unsigned int) getGroupInfo: (int)group { queue_entry_t q; q.what=WHAT_GROUP_INFO; q.d.group=group; return [self queueAdd: &q : 0]; } -(unsigned int) getHeaderRange: (int)low : (int)high group: (int)group priority: (int)pri { queue_entry_t q; q.what=WHAT_HEADER_RANGE; q.d.range.group=group; q.d.range.l=low; q.d.range.h=high; return [self queueAdd: &q : pri]; } -(unsigned int) getHeaderById: (const char *)msg_id priority: (int)pri { queue_entry_t q; q.what=WHAT_HEADER_ID; q.d.msg.id=strdup(msg_id); /* TODO: handle error */ return [self queueAdd: &q : pri]; } -(unsigned int) getArticleRange: (int)low : (int)high group: (int)group priority: (int)pri { queue_entry_t q; q.what=WHAT_ARTICLE_RANGE; q.d.range.group=group; q.d.range.l=low; q.d.range.h=high; return [self queueAdd: &q : pri]; } -(unsigned int) getArticleById: (const char *)msg_id priority: (int)pri { queue_entry_t q; q.what=WHAT_ARTICLE_ID; q.d.msg.id=strdup(msg_id); /* TODO: handle error */ q.d.msg.bytes=1000; return [self queueAdd: &q : pri]; } -(unsigned int) getArticleById: (const char *)msg_id size: (int)bytes priority: (int)pri { queue_entry_t q; q.what=WHAT_ARTICLE_ID; q.d.msg.id=strdup(msg_id); /* TODO: handle error */ q.d.msg.bytes=bytes; return [self queueAdd: &q : pri]; } -(unsigned int) getGroupList: (int)priority { queue_entry_t q; q.what=WHAT_GROUP_LIST; return [self queueAdd: &q : priority]; } -(unsigned int) postArticle: (unsigned char *)data length: (int)length priority: (int)priority { queue_entry_t q; unsigned char *b,*c,*dst; int i,j; q.what=WHAT_POST_ARTICLE; for (b=data,i=length,j=0;i;b++,i--) if (b[0]=='\n') { j++; if (i>0 && b[1]=='.') j++; } j+=length; j+=5; dst=malloc(j); if (!dst) abort(); /* TODO: handle error */ for (b=data,c=dst,i=length;i;i--) { if (b[0]=='\n') *c++='\r'; *c++=*b++; if (i>1 && b[-1]=='\n' && b[0]=='.') *c++='.'; } *c++='\r'; *c++='\n'; *c++='.'; *c++='\r'; *c++='\n'; q.d.post.data=dst; q.d.post.length=j; return [self queueAdd: &q : priority]; } -(NSString *)qidDescription: (unsigned int)qid { int i; queue_entry_t *q=NULL; for (i=0;iqe+1,i=0;iqe_num;i++,q++) if (q->qid==qid) break; if (i==queue->qe_num) q=NULL; } if (!q) return [NSString stringWithFormat: @"qid %i",qid]; switch (q->what) { default: return [NSString stringWithFormat: @"qid %i %i",qid,q->what]; case WHAT_NONE: return @"NONE"; case WHAT_DATE: return @"DATE"; case WHAT_GROUP_INFO: return [NSString stringWithFormat: @"GROUP_INFO %s",group_names[q->d.group]]; case WHAT_HEADER_RANGE: return [NSString stringWithFormat: @"HEADER_RANGE %s %i-%i", group_names[q->d.range.group],q->d.range.l,q->d.range.h]; case WHAT_HEADER_ID: return [NSString stringWithFormat: @"HEADER_ID %s",q->d.msg.id]; case WHAT_ARTICLE_RANGE: return [NSString stringWithFormat: @"ARTICLE_RANGE %s %i-%i", group_names[q->d.range.group],q->d.range.l,q->d.range.h]; case WHAT_ARTICLE_ID: return [NSString stringWithFormat: @"ARTICLE_ID %s",q->d.msg.id]; case WHAT_GROUP_LIST: return @"GROUP_LIST"; case WHAT_POST_ARTICLE: return @"POST_ARTICLE"; } } @end #ifdef DEBUG_SCHEDULER #include #include #include #include static NSTextView *ds_tv; static NNTPServer *ds_server; static FILE *f; static NSString *last_string; @implementation NNTPServer (debug_scheduler) -(void) _updateDebugScheduler { NSMutableString *s; int i; con_t *c; queue_entry_t *qe; if (!ds_tv) { NSWindow *win; if (!queue->qe_num && !num_active) return; win=[[NSWindow alloc] initWithContentRect: NSMakeRect(10,64,500,500) styleMask: NSClosableWindowMask|NSTitledWindowMask|NSResizableWindowMask|NSMiniaturizableWindowMask backing: NSBackingStoreRetained defer: YES]; ds_tv=[[NSTextView alloc] init]; [ds_tv setFont: [NSFont userFixedPitchFontOfSize: 0]]; [ds_tv setEditable: NO]; [[ds_tv textContainer] setWidthTracksTextView: YES]; [[ds_tv textContainer] setHeightTracksTextView: NO]; [[ds_tv textContainer] setContainerSize: NSMakeSize(1e6,1e6)]; [win setContentView: ds_tv]; RELEASE(ds_tv); ds_server=self; [win setTitle: @"NNTPServer debug"]; [win orderFront: self]; printf("created window: %p\n",win); f=fopen("debug_sched.txt","at"); } if (ds_server!=self) return; s=[[NSMutableString alloc] init]; [s appendString: [NSString stringWithFormat: @"num=%i idle=%i active=%i starting=%i\n\n", num_cons,num_idle,num_active,num_starting]]; for (i=0,c=cons;ivalid,c->sock, c->write_len,c->write_pending_ofs, c->resp,c->resp_code,c->resp_msg, c->resp_buf_len,c->resp_data_len, c->state,state_name[c->state], c->cur_group,c->should_close,c->reg_write, c->cur.priority,[self qidDescription: c->cur.qid]]]; } for (i=1,qe=queue->qe+1;i<=queue->qe_num;i++,qe++) { [s appendString: [NSString stringWithFormat: @"%3i: %3i %@\n",qe->qid,qe->priority,[self qidDescription: qe->qid]]]; } [ds_tv setString: s]; if ([s isEqual: last_string]) { DESTROY(s); } else { fputs("----\n",f); fputs([s cString],f); fflush(f); DESTROY(last_string); last_string=s; } } @end #endif