/* $Id: batcher.c 6762 2004-05-17 04:24:53Z rra $ ** ** Read batchfiles on standard input and spew out batches. */ #include "config.h" #include "clibrary.h" #include #include #include #include #include #include #include "inn/innconf.h" #include "inn/messages.h" #include "inn/timer.h" #include "libinn.h" #include "paths.h" #include "storage.h" /* ** Global variables. */ static bool BATCHopen; static bool STATprint; static double STATbegin; static double STATend; static char *Host; static char *InitialString; static char *Input; static char *Processor; static int ArtsInBatch; static int ArtsWritten; static int BATCHcount; static int MaxBatches; static int BATCHstatus; static long BytesInBatch = 60 * 1024; static long BytesWritten; static long MaxArts; static long MaxBytes; static sig_atomic_t GotInterrupt; static const char *Separator = "#! rnews %ld"; static char *ERRLOG; /* ** Start a batch process. */ static FILE * BATCHstart(void) { FILE *F; char buff[SMBUF]; if (Processor && *Processor) { snprintf(buff, sizeof(buff), Processor, Host); F = popen(buff, "w"); if (F == NULL) return NULL; } else F = stdout; BATCHopen = true; BATCHcount++; return F; } /* ** Close a batch, return exit status. */ static int BATCHclose(FILE *F) { BATCHopen = false; if (F == stdout) return fflush(stdout) == EOF ? 1 : 0; return pclose(F); } /* ** Update the batch file and exit. */ static void RequeueAndExit(off_t Cookie, char *line, long BytesInArt) { static char LINE1[] = "batcher %s times user %.3f system %.3f elapsed %.3f"; static char LINE2[] ="batcher %s stats batches %d articles %d bytes %ld"; char *spool; char buff[BIG_BUFFER]; int i; FILE *F; double usertime; double systime; /* Do statistics. */ STATend = TMRnow_double(); if (GetResourceUsage(&usertime, &systime) < 0) { usertime = 0; systime = 0; } if (STATprint) { printf(LINE1, Host, usertime, systime, STATend - STATbegin); printf("\n"); printf(LINE2, Host, BATCHcount, ArtsWritten, BytesWritten); printf("\n"); } syslog(L_NOTICE, LINE1, Host, usertime, systime, STATend - STATbegin); syslog(L_NOTICE, LINE2, Host, BATCHcount, ArtsWritten, BytesWritten); /* Last batch exit okay? */ if (BATCHstatus == 0) { if (feof(stdin) && Cookie != -1) { /* Yes, and we're all done -- remove input and exit. */ fclose(stdin); if (Input) unlink(Input); exit(0); } } /* Make an appropriate spool file. */ if (Input == NULL) spool = concatpath(innconf->pathoutgoing, Host); else spool = concat(Input, ".bch", (char *) 0); if ((F = xfopena(spool)) == NULL) sysdie("%s cannot open %s", Host, spool); /* If we can back up to where the batch started, do so. */ i = 0; if (Cookie != -1 && fseeko(stdin, Cookie, SEEK_SET) == -1) { syswarn("%s cannot seek", Host); i = 1; } /* Write the line we had; if the fseeko worked, this will be an * extra line, but that's okay. */ if (line && fprintf(F, "%s %ld\n", line, BytesInArt) == EOF) { syswarn("%s cannot write spool", Host); i = 1; } /* Write rest of stdin to spool. */ while (fgets(buff, sizeof buff, stdin) != NULL) if (fputs(buff, F) == EOF) { syswarn("%s cannot write spool", Host); i = 1; break; } if (fclose(F) == EOF) { syswarn("%s cannot close spool", Host); i = 1; } /* If we had a named input file, try to rename the spool. */ if (Input != NULL && rename(spool, Input) < 0) { syswarn("%s cannot rename spool", Host); i = 1; } exit(i); /* NOTREACHED */ } /* ** Mark that we got interrupted. */ static RETSIGTYPE CATCHinterrupt(int s) { GotInterrupt = true; /* Let two interrupts kill us. */ xsignal(s, SIG_DFL); } int main(int ac, char *av[]) { bool Redirect; FILE *F; const char *AltSpool; char *p; char *data; char line[BIG_BUFFER]; char buff[BIG_BUFFER]; int BytesInArt; long BytesInCB; off_t Cookie; size_t datasize; int i; int ArtsInCB; int length; TOKEN token; ARTHANDLE *art; char *artdata; /* Set defaults. */ openlog("batcher", L_OPENLOG_FLAGS | LOG_PID, LOG_INN_PROG); message_program_name = "batcher"; if (!innconf_read(NULL)) exit(1); AltSpool = NULL; Redirect = true; umask(NEWSUMASK); ERRLOG = concatpath(innconf->pathlog, _PATH_ERRLOG); /* Parse JCL. */ while ((i = getopt(ac, av, "a:A:b:B:i:N:p:rs:S:v")) != EOF) switch (i) { default: die("usage error"); break; case 'a': ArtsInBatch = atoi(optarg); break; case 'A': MaxArts = atol(optarg); break; case 'b': BytesInBatch = atol(optarg); break; case 'B': MaxBytes = atol(optarg); break; case 'i': InitialString = optarg; break; case 'N': MaxBatches = atoi(optarg); break; case 'p': Processor = optarg; break; case 'r': Redirect = false; break; case 's': Separator = optarg; break; case 'S': AltSpool = optarg; break; case 'v': STATprint = true; break; } if (MaxArts && ArtsInBatch == 0) ArtsInBatch = MaxArts; if (MaxBytes && BytesInBatch == 0) BytesInBatch = MaxBytes; /* Parse arguments. */ ac -= optind; av += optind; if (ac != 1 && ac != 2) die("usage error"); Host = av[0]; if ((Input = av[1]) != NULL) { if (Input[0] != '/') Input = concatpath(innconf->pathoutgoing, av[1]); if (freopen(Input, "r", stdin) == NULL) sysdie("%s cannot open %s", Host, Input); } if (Redirect) freopen(ERRLOG, "a", stderr); /* Go to where the articles are. */ if (chdir(innconf->patharticles) < 0) sysdie("%s cannot chdir to %s", Host, innconf->patharticles); /* Set initial counters, etc. */ datasize = 8 * 1024; data = xmalloc(datasize); BytesInCB = 0; ArtsInCB = 0; Cookie = -1; GotInterrupt = false; xsignal(SIGHUP, CATCHinterrupt); xsignal(SIGINT, CATCHinterrupt); xsignal(SIGTERM, CATCHinterrupt); /* xsignal(SIGPIPE, CATCHinterrupt); */ STATbegin = TMRnow_double(); SMinit(); F = NULL; while (fgets(line, sizeof line, stdin) != NULL) { /* Record line length in case we do an ftello. Not portable to * systems with non-Unix file formats. */ length = strlen(line); Cookie = ftello(stdin) - length; /* Get lines like "name size" */ if ((p = strchr(line, '\n')) == NULL) { warn("%s skipping %.40s: too long", Host, line); continue; } *p = '\0'; if (line[0] == '\0' || line[0] == '#') continue; if ((p = strchr(line, ' ')) != NULL) { *p++ = '\0'; /* Try to be forgiving of bad input. */ BytesInArt = CTYPE(isdigit, (int)*p) ? atol(p) : -1; } else BytesInArt = -1; /* Strip of leading spool pathname. */ if (line[0] == '/' && line[strlen(innconf->patharticles)] == '/' && strncmp(line, innconf->patharticles, strlen(innconf->patharticles)) == 0) p = line + strlen(innconf->patharticles) + 1; else p = line; /* Open the file. */ if (IsToken(p)) { token = TextToToken(p); if ((art = SMretrieve(token, RETR_ALL)) == NULL) { if ((SMerrno != SMERR_NOENT) && (SMerrno != SMERR_UNINIT)) warn("%s skipping %.40s: %s", Host, p, SMerrorstr); continue; } BytesInArt = -1; artdata = FromWireFmt(art->data, art->len, (size_t *)&BytesInArt); SMfreearticle(art); } else { warn("%s skipping %.40s: not token", Host, p); continue; } /* Have an open article, do we need to open a batch? This code * is here (rather then up before the while loop) so that we * can avoid sending an empty batch. The goto makes the code * a bit more clear. */ if (F == NULL) { if (GotInterrupt) { RequeueAndExit(Cookie, (char *)NULL, 0L); } if ((F = BATCHstart()) == NULL) { syswarn("%s cannot start batch %d", Host, BATCHcount); break; } if (InitialString && *InitialString) { fprintf(F, "%s\n", InitialString); BytesInCB += strlen(InitialString) + 1; BytesWritten += strlen(InitialString) + 1; } goto SendIt; } /* We're writing a batch, see if adding the current article * would exceed the limits. */ if ((ArtsInBatch > 0 && ArtsInCB + 1 >= ArtsInBatch) || (BytesInBatch > 0 && BytesInCB + BytesInArt >= BytesInBatch)) { if ((BATCHstatus = BATCHclose(F)) != 0) { if (BATCHstatus == -1) syswarn("%s cannot close batch %d", Host, BATCHcount); else syswarn("%s batch %d exit status %d", Host, BATCHcount, BATCHstatus); break; } ArtsInCB = 0; BytesInCB = 0; /* See if we can start a new batch. */ if ((MaxBatches > 0 && BATCHcount >= MaxBatches) || (MaxBytes > 0 && BytesWritten + BytesInArt >= MaxBytes) || (MaxArts > 0 && ArtsWritten + 1 >= MaxArts)) { break; } if (GotInterrupt) { RequeueAndExit(Cookie, (char *)NULL, 0L); } if ((F = BATCHstart()) == NULL) { syswarn("%s cannot start batch %d", Host, BATCHcount); break; } } SendIt: /* Now we can start to send the article! */ if (Separator && *Separator) { snprintf(buff, sizeof(buff), Separator, BytesInArt); BytesInCB += strlen(buff) + 1; BytesWritten += strlen(buff) + 1; if (fprintf(F, "%s\n", buff) == EOF || ferror(F)) { syswarn("%s cannot write separator", Host); break; } } /* Write the article. In case of interrupts, retry the read but not the fwrite because we can't check that reliably and portably. */ if ((fwrite(artdata, 1, BytesInArt, F) != BytesInArt) || ferror(F)) break; /* Update the counts. */ BytesInCB += BytesInArt; BytesWritten += BytesInArt; ArtsInCB++; ArtsWritten++; if (GotInterrupt) { Cookie = -1; BATCHstatus = BATCHclose(F); RequeueAndExit(Cookie, line, BytesInArt); } } if (BATCHopen) BATCHstatus = BATCHclose(F); RequeueAndExit(Cookie, NULL, 0); return 0; }