/* $Id: sync.c,v 1.66.2.5 2006/11/07 05:12:12 manu Exp $ */ /* * Copyright (c) 2004 Emmanuel Dreyfus * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgement: * This product includes software developed by Emmanuel Dreyfus * * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE * DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, * STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED * OF THE POSSIBILITY OF SUCH DAMAGE. */ #include "config.h" #ifdef HAVE_SYS_CDEFS_H #include #ifdef __RCSID __RCSID("$Id: sync.c,v 1.66.2.5 2006/11/07 05:12:12 manu Exp $"); #endif #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include "pending.h" #include "sync.h" #include "conf.h" #include "autowhite.h" #include "milter-greylist.h" #define SYNC_PROTO_CURRENT 3 struct sync_master_sock { int runs; int sock; }; struct sync_master_sock sync_master4 = { 0, -1 }; struct sync_master_sock sync_master6 = { 0, -1 }; struct peerlist peer_head; pthread_rwlock_t peer_lock; /* For the peer list */ pthread_rwlock_t sync_lock; /* For all peer's sync queue */ pthread_cond_t sync_sleepflag; static void sync_listen(char *, char *, struct sync_master_sock *); static int local_addr(const struct sockaddr *sa, const socklen_t salen); static int sync_queue_poke(struct peer *, struct sync *); static struct sync * sync_queue_peek(struct peer *); static int select_protocol(struct peer *, int, FILE *); static void sync_vers(FILE *, int); void peer_init(void) { int error; LIST_INIT(&peer_head); if ((error = pthread_rwlock_init(&peer_lock, NULL)) != 0) { mg_log(LOG_ERR, "pthread_rwlock_init failed: %s", strerror(error)); exit(EX_OSERR); } if ((error = pthread_rwlock_init(&sync_lock, NULL)) != 0) { mg_log(LOG_ERR, "pthread_rwlock_init failed: %s", strerror(error)); exit(EX_OSERR); } if ((error = pthread_cond_init(&sync_sleepflag, NULL)) != 0) { mg_log(LOG_ERR, "pthread_cond_init failed: %s", strerror(error)); exit(EX_OSERR); } return; } void peer_clear(void) { struct peer *peer; struct sync *sync; PEER_WRLOCK; while(!LIST_EMPTY(&peer_head)) { peer = LIST_FIRST(&peer_head); while((sync = sync_queue_peek(peer)) != NULL) sync_free(sync); if (peer->p_stream != NULL) fclose(peer->p_stream); LIST_REMOVE(peer, p_list); free(peer->p_name); free(peer); } PEER_UNLOCK; return; } void peer_add(peername) char *peername; { struct peer *peer; if ((peer = malloc(sizeof(*peer))) == NULL || (peer->p_name = strdup(peername)) == NULL) { mg_log(LOG_ERR, "cannot add peer: %s", strerror(errno)); exit(EX_OSERR); } peer->p_qlen = 0; peer->p_stream = NULL; peer->p_flags = 0; TAILQ_INIT(&peer->p_deferred); PEER_WRLOCK; LIST_INSERT_HEAD(&peer_head, peer, p_list); PEER_UNLOCK; if (conf.c_debug) mg_log(LOG_DEBUG, "load peer %s", peer->p_name); return; } void peer_create(pending) struct pending *pending; { struct peer *peer; PEER_RDLOCK; if (LIST_EMPTY(&peer_head)) goto out; LIST_FOREACH(peer, &peer_head, p_list) sync_queue(peer, PS_CREATE, pending, -1); /* -1: unused */ out: PEER_UNLOCK; return; } void peer_delete(pending, autowhite) struct pending *pending; time_t autowhite; { struct peer *peer; PEER_RDLOCK; if (LIST_EMPTY(&peer_head)) goto out; LIST_FOREACH(peer, &peer_head, p_list) sync_queue(peer, PS_DELETE, pending, autowhite); out: PEER_UNLOCK; return; } static int sync_queue_poke(peer, sync) struct peer *peer; struct sync *sync; { SYNC_WRLOCK; if (peer->p_qlen < SYNC_MAXQLEN) { TAILQ_INSERT_HEAD(&peer->p_deferred, sync, s_list); peer->p_qlen++; SYNC_UNLOCK; return 1; } else { SYNC_UNLOCK; return 0; } } static struct sync * sync_queue_peek(peer) struct peer *peer; { struct sync *sync; SYNC_WRLOCK; sync = TAILQ_FIRST(&peer->p_deferred); if (!TAILQ_EMPTY(&peer->p_deferred)) { TAILQ_REMOVE(&peer->p_deferred, sync, s_list); peer->p_qlen--; } SYNC_UNLOCK; return sync; } int sync_send(peer, type, pending, autowhite) /* peer list is read-locked */ struct peer *peer; peer_sync_t type; struct pending *pending; time_t autowhite; { char sep[] = " \n\t\r"; char *replystr; int replycode; char line[LINELEN + 1]; char *cookie = NULL; char *keyw; char awstr[LINELEN + 1]; int bw; if ((peer->p_stream == NULL) && (peer_connect(peer) != 0)) return -1; *line = '\0'; switch(type) { case PS_FLUSH: bw = snprintf(line, LINELEN, "flush addr %s\r\n", pending->p_addr); break; case PS_CREATE: bw = snprintf(line, LINELEN, "add addr %s from %s " "rcpt %s date %ld\r\n", pending->p_addr, pending->p_from, pending->p_rcpt, (long)pending->p_tv.tv_sec); break; default: if (peer->p_vers >= 2) { keyw = "del2"; snprintf(awstr, LINELEN, " aw %ld", (long)autowhite); } else { keyw = "del"; awstr[0] = '\0'; } bw = snprintf(line, LINELEN, "%s addr %s from %s " "rcpt %s date %ld%s\r\n", keyw, pending->p_addr, pending->p_from, pending->p_rcpt, (long)pending->p_tv.tv_sec, awstr); break; } if (bw > LINELEN) { mg_log(LOG_ERR, "closing connexion with peer %s: " "send buffer would overflow (%d entries queued)", peer->p_name, peer->p_qlen); fclose(peer->p_stream); peer->p_stream = NULL; return -1; } bw = fprintf(peer->p_stream, "%s", line); if (bw != strlen(line)) { mg_log(LOG_ERR, "closing connexion with peer %s: " "%s (%d entries queued) - I was unable to send " "complete line \"%s\" - bytes written: %i", peer->p_name, strerror(errno), peer->p_qlen, line, bw); fclose(peer->p_stream); peer->p_stream = NULL; return -1; } fflush(peer->p_stream); /* * Check the return code */ get_more: sync_waitdata(peer->p_socket); if (fgets(line, LINELEN, peer->p_stream) == NULL) { if (errno == EAGAIN) { if ( feof(peer->p_stream) ) { mg_log(LOG_ERR, "lost connexion with peer %s: " "%s (%d entries queued)", peer->p_name, strerror(errno), peer->p_qlen); fclose(peer->p_stream); peer->p_stream = NULL; return -1; } goto get_more; } mg_log(LOG_ERR, "lost connexion with peer %s: " "%s (%d entries queued)", peer->p_name, strerror(errno), peer->p_qlen); fclose(peer->p_stream); peer->p_stream = NULL; return -1; } /* * On some systems, opening a stream on a socket introduce * weird behavior: the in and out buffers get mixed up. * By calling fflush() after each read operation, we fix that */ fflush(peer->p_stream); if ((replystr = strtok_r(line, sep, &cookie)) == NULL) { mg_log(LOG_ERR, "Unexpected reply \"%s\" from %s, " "closing connexion (%d entries queued)", line, peer->p_name, peer->p_qlen); fclose(peer->p_stream); peer->p_stream = NULL; return -1; } replycode = atoi(replystr); if (replycode != 201) { mg_log(LOG_ERR, "Unexpected reply \"%s\" from %s, " "closing connexion (%d entries queued)", line, peer->p_name, peer->p_qlen); fclose(peer->p_stream); peer->p_stream = NULL; return -1; } if (conf.c_debug) mg_log(LOG_DEBUG, "sync one entry with %s", peer->p_name); return 0; } int peer_connect(peer) /* peer list is read-locked */ struct peer *peer; { struct servent *se; #ifdef HAVE_GETADDRINFO struct addrinfo hints, *res0, *res; int err; #else struct protoent *pe; int proto; sockaddr_t raddr; socklen_t raddrlen; #endif sockaddr_t laddr; socklen_t laddrlen; char *laddrstr; int service; int s = -1; char *replystr; int replycode; FILE *stream; char sep[] = " \n\t\r"; char line[LINELEN + 1]; int param; char *cookie = NULL; if (peer->p_stream != NULL) mg_log(LOG_ERR, "peer_connect called and peer->p_stream != 0"); if (conf.c_syncport != NULL) { service = htons(atoi(conf.c_syncport)); } else { if ((se = getservbyname(MXGLSYNC_NAME, "tcp")) == NULL) service = htons(atoi(MXGLSYNC_PORT)); else service = se->s_port; } #ifdef HAVE_GETADDRINFO bzero(&hints, sizeof(hints)); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; if ((err = getaddrinfo(peer->p_name, "0", &hints, &res0)) != 0) { mg_log(LOG_ERR, "cannot sync with peer %s, " "getaddrinfo failed: %s (%d entries queued)", peer->p_name, gai_strerror(err), peer->p_qlen); return -1; } for (res = res0; res; res = res->ai_next) { /*We only test an address family which kernel supports. */ s = socket(res->ai_family, res->ai_socktype, res->ai_protocol); if (s == -1) continue; close(s); if (local_addr(res->ai_addr, res->ai_addrlen)) { peer->p_flags |= P_LOCAL; freeaddrinfo(res0); return -1; } } for (res = res0; res; res = res->ai_next) { s = socket(res->ai_family, res->ai_socktype, res->ai_protocol); if (s == -1) continue; switch (res->ai_family) { case AF_INET: SA4(res->ai_addr)->sin_port = service; if (conf.c_syncsrcaddr != NULL) { laddrstr = conf.c_syncsrcaddr; } else { laddrstr = "0.0.0.0"; } break; #ifdef AF_INET6 case AF_INET6: SA6(res->ai_addr)->sin6_port = service; laddrstr = "::"; break; #endif default: mg_log(LOG_ERR, "cannot sync, unknown address family"); close(s); s = -1; continue; } laddrlen = sizeof(laddr); if (ipfromstring(laddrstr, SA(&laddr), &laddrlen, res->ai_family) == 1 && bind(s, SA(&laddr), laddrlen) == 0 && connect(s, res->ai_addr, res->ai_addrlen) == 0) break; close(s); s = -1; } freeaddrinfo(res0); if (s < 0) { mg_log(LOG_ERR, "cannot sync with peer %s: %s (%d entries queued)", peer->p_name, strerror(errno), peer->p_qlen); return -1; } #else raddrlen = sizeof(raddr); if (ipfromstring(peer->p_name, SA(&raddr), &raddrlen, AF_UNSPEC) != 1) { mg_log(LOG_ERR, "cannot sync, invalid address"); return -1; } if (local_addr(SA(&raddr), raddrlen)) { peer->p_flags |= P_LOCAL; return -1; } switch (SA(&raddr)->sa_family) { case AF_INET: SA4(&raddr)->sin_port = service; if (conf.c_syncsrcaddr != NULL) { laddrstr = conf.c_syncsrcaddr; } else { laddrstr = "0.0.0.0"; } break; #ifdef AF_INET6 case AF_INET6: SA6(&raddr)->sin6_port = service; laddrstr = "::"; break; #endif default: mg_log(LOG_ERR, "cannot sync, unknown address family"); return -1; } if ((pe = getprotobyname("tcp")) == NULL) proto = 6; else proto = pe->p_proto; if ((s = socket(SA(&raddr)->sa_family, SOCK_STREAM, proto)) == -1) { mg_log(LOG_ERR, "cannot sync with peer %s, " "socket failed: %s (%d entries queued)", peer->p_name, strerror(errno), peer->p_qlen); return -1; } laddrlen = sizeof(laddr); if (ipfromstring(laddrstr, SA(&laddr), &laddrlen, SA(&raddr)->sa_family) != 1) { mg_log(LOG_ERR, "cannot sync, invalid address"); close(s); return -1; } if (bind(s, SA(&laddr), laddrlen) != 0) { mg_log(LOG_ERR, "cannot sync with peer %s, " "bind failed: %s (%d entries queued)", peer->p_name, strerror(errno), peer->p_qlen); close(s); return -1; } if (connect(s, SA(&raddr), raddrlen) != 0) { mg_log(LOG_ERR, "cannot sync with peer %s, " "connect failed: %s (%d entries queued)", peer->p_name, strerror(errno), peer->p_qlen); close(s); return -1; } #endif param = O_NONBLOCK; if (fcntl(s, F_SETFL, param) != 0) { mg_log(LOG_ERR, "cannot set non blocking I/O with %s: %s", peer->p_name, strerror(errno)); } if ((stream = fdopen(s, "w+")) == NULL) { mg_log(LOG_ERR, "cannot sync with peer %s, " "fdopen failed: %s (%d entries queued)", peer->p_name, strerror(errno), peer->p_qlen); close(s); return -1; } if (setvbuf(stream, NULL, _IOLBF, 0) != 0) mg_log(LOG_ERR, "cannot set line buffering with peer %s: %s", peer->p_name, strerror(errno)); sync_waitdata(s); if (fgets(line, LINELEN, stream) == NULL) { mg_log(LOG_ERR, "Lost connexion with peer %s: " "%s (%d entries queued)", peer->p_name, strerror(errno), peer->p_qlen); goto bad; } /* * On some systems, opening a stream on a socket introduce * weird behavior: the in and out buffers get mixed up. * By calling fflush() after each read operation, we fix that */ fflush(stream); if ((replystr = strtok_r(line, sep, &cookie)) == NULL) { mg_log(LOG_ERR, "Unexpected reply \"%s\" from peer %s " "closing connexion (%d entries queued)", line, peer->p_name, peer->p_qlen); goto bad; } replycode = atoi(replystr); if (replycode != 200) { mg_log(LOG_ERR, "Unexpected reply \"%s\" from peer %s " "closing connexion (%d entries queued)", line, peer->p_name, peer->p_qlen); goto bad; } if ((peer->p_vers = select_protocol(peer, s, stream)) == 0) goto bad; mg_log(LOG_INFO, "Connection to %s established, protocol version %d", peer->p_name, peer->p_vers); peer->p_stream = stream; peer->p_socket = s; return 0; bad: fclose(stream); peer->p_stream = NULL; return -1; } void sync_master_restart(void) { pthread_t tid; int empty; int error; PEER_RDLOCK; empty = LIST_EMPTY(&peer_head); PEER_UNLOCK; if (empty || sync_master4.runs || sync_master6.runs) return; if (conf.c_syncaddr != NULL) { if (strchr(conf.c_syncaddr, ':')) sync_listen(conf.c_syncaddr, conf.c_syncport, &sync_master6); else sync_listen(conf.c_syncaddr, conf.c_syncport, &sync_master4); } else { #ifdef AF_INET6 sync_listen("::", conf.c_syncport, &sync_master6); #endif sync_listen("0.0.0.0", conf.c_syncport, &sync_master4); } if (!sync_master4.runs && !sync_master6.runs) { mg_log(LOG_ERR, "cannot start MX sync, socket failed: %s", strerror(errno)); exit(EX_OSERR); } if (sync_master6.runs) { if ((error = pthread_create(&tid, NULL, sync_master, (void *)&sync_master6)) != 0) { mg_log(LOG_ERR, "Cannot run MX sync thread for IPv6: %s", strerror(error)); exit(EX_OSERR); } if ((error = pthread_detach(tid)) != 0) { mg_log(LOG_ERR, "pthread_detach failed for IPv6 MX sync: %s", strerror(error)); exit(EX_OSERR); } } if (sync_master4.runs) { if ((error = pthread_create(&tid, NULL, sync_master, (void *)&sync_master4)) != 0) { mg_log(LOG_ERR, "Cannot run MX sync thread for IPv4: %s", strerror(error)); exit(EX_OSERR); } if ((error = pthread_detach(tid)) != 0) { mg_log(LOG_ERR, "pthread_detach failed for IPv4 MX sync: %s", strerror(error)); exit(EX_OSERR); } } } void * sync_master(arg) void *arg; { struct sync_master_sock *sms = arg; for (;;) { sockaddr_t raddr; socklen_t raddrlen; int fd; FILE *stream; pthread_t tid; struct peer *peer; char peerstr[IPADDRSTRLEN]; int error; bzero((void *)&raddr, sizeof(raddr)); raddrlen = sizeof(raddr); if ((fd = accept(sms->sock, SA(&raddr), &raddrlen)) == -1) { mg_log(LOG_ERR, "incoming connexion " "failed: %s", strerror(errno)); if (errno != ECONNABORTED) exit(EX_OSERR); continue; } unmappedaddr(SA(&raddr), &raddrlen); iptostring(SA(&raddr), raddrlen, peerstr, sizeof(peerstr)); mg_log(LOG_INFO, "Incoming MX sync connexion from %s", peerstr); if ((stream = fdopen(fd, "w+")) == NULL) { mg_log(LOG_ERR, "incoming connexion from %s failed, " "fdopen fail: %s", peerstr, strerror(errno)); close(fd); exit(EX_OSERR); } if (setvbuf(stream, NULL, _IOLBF, 0) != 0) mg_log(LOG_ERR, "cannot set line buffering: %s", strerror(errno)); /* * Check that the orginator IP is one of our peers */ PEER_RDLOCK; if (LIST_EMPTY(&peer_head)) { fprintf(stream, "105 No more peers, shutting down!\n"); PEER_UNLOCK; fclose(stream); close(sms->sock); sms->sock = -1; sms->runs = 0; return NULL; } LIST_FOREACH(peer, &peer_head, p_list) { #ifdef HAVE_GETADDRINFO struct addrinfo hints, *res0, *res; int err; int match = 0; bzero(&hints, sizeof(hints)); hints.ai_flags = AI_PASSIVE; hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; err = getaddrinfo(peer->p_name, "0", &hints, &res0); if (err != 0) { mg_log(LOG_ERR, "cannot resolve %s: %s", peer->p_name, gai_strerror(err)); continue; } for (res = res0; res; res = res->ai_next) { if (ip_equal(SA(&raddr), res->ai_addr)) { match = 1; break; } } freeaddrinfo(res0); if (match) break; #else sockaddr_t addr; socklen_t addrlen; addrlen = sizeof(addr); if (ipfromstring(peer->p_name, SA(&addr), &addrlen, AF_UNSPEC) != 1) { mg_log(LOG_ERR, "cannot resolve %s", peer->p_name); continue; } if (ip_equal(SA(&raddr), SA(&addr))) break; #endif } PEER_UNLOCK; if (peer == NULL) { mg_log(LOG_INFO, "Remote host %s is not a peer MX", peerstr); fprintf(stream, "106 You have no permission to talk, go away!\n"); fclose(stream); continue; } if ((error = pthread_create(&tid, NULL, (void *(*)(void *))sync_server, (void *)stream)) != 0) { mg_log(LOG_ERR, "incoming connexion from %s failed, " "pthread_create failed: %s", peerstr, strerror(error)); fclose(stream); continue; } if ((error = pthread_detach(tid)) != 0) { mg_log(LOG_ERR, "incoming connexion from %s failed, " "pthread_detach failed: %s", peerstr, strerror(error)); exit(EX_OSERR); } } /* NOTREACHED */ mg_log(LOG_ERR, "sync_master quitted unexpectedly"); return NULL; } static void sync_listen(addr, port, sms) char *addr, *port; struct sync_master_sock *sms; { struct protoent *pe; struct servent *se; int proto; sockaddr_t laddr; socklen_t laddrlen; int service; int optval; int s; sms->runs = 1; laddrlen = sizeof(laddr); if (ipfromstring(addr, SA(&laddr), &laddrlen, AF_UNSPEC) != 1) { sms->runs = 0; return; } if ((pe = getprotobyname("tcp")) == NULL) proto = 6; else proto = pe->p_proto; if (port != NULL) service = htons(atoi(port)); else { if ((se = getservbyname(MXGLSYNC_NAME, "tcp")) == NULL) service = htons(atoi(MXGLSYNC_PORT)); else service = se->s_port; } switch (SA(&laddr)->sa_family) { case AF_INET: SA4(&laddr)->sin_port = service; break; #ifdef AF_INET6 case AF_INET6: SA6(&laddr)->sin6_port = service; break; #endif default: sms->runs = 0; return; } if ((s = socket(SA(&laddr)->sa_family, SOCK_STREAM, proto)) == -1) { sms->runs = 0; return; } optval = 1; if ((setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval))) != 0) { mg_log(LOG_ERR, "cannot set SO_REUSEADDR: %s", strerror(errno)); } optval = 1; if ((setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, &optval, sizeof(optval))) != 0) { mg_log(LOG_ERR, "cannot set SO_KEEPALIVE: %s", strerror(errno)); } #ifdef IPV6_V6ONLY if (SA(&laddr)->sa_family == AF_INET6) { optval = 1; if ((setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, &optval, sizeof(optval))) != 0) { mg_log(LOG_ERR, "cannot set IPV6_V6ONLY: %s", strerror(errno)); } } #endif if (bind(s, SA(&laddr), laddrlen) != 0) { mg_log(LOG_ERR, "cannot start MX sync, bind failed: %s", strerror(errno)); sms->runs = 0; close(s); return; } if (listen(s, MXGLSYNC_BACKLOG) != 0) { mg_log(LOG_ERR, "cannot start MX sync, listen failed: %s", strerror(errno)); sms->runs = 0; close(s); return; } sms->sock = s; return; } void sync_server(arg) void *arg; { FILE *stream = arg; char sep[] = " \n\t\r"; char *cmd; char *keyword; char *addrstr; char *from; char *rcpt; char from_clean[ADDRLEN + 1]; char rcpt_clean[ADDRLEN + 1]; char *datestr; char *awstr; char *cookie; char line[LINELEN + 1]; peer_sync_t action; sockaddr_t addr; socklen_t addrlen; time_t date; time_t aw = time(NULL) + conf.c_autowhite_validity; fprintf(stream, "200 Yeah, what do you want?\n"); fflush(stream); for (;;) { if ((fgets(line, LINELEN, stream)) == NULL) break; /* * On some systems, opening a stream on a socket introduce * weird behavior: the in and out buffers get mixed up. * By calling fflush() after each read operation, we fix that */ fflush(stream); /* * Get the command { quit | help | add | del | del2 | flush } */ cookie = NULL; if ((cmd = strtok_r(line, sep, &cookie)) == NULL) { fprintf(stream, "101 No command\n"); fflush(stream); continue; } if (strncmp(cmd, "quit", CMDLEN) == 0) { break; } else if ((strncmp(cmd, "help", CMDLEN)) == 0) { sync_help(stream); continue; } else if ((strncmp(cmd, "vers3", CMDLEN)) == 0) { sync_vers(stream, 3); continue; } else if ((strncmp(cmd, "vers2", CMDLEN)) == 0) { sync_vers(stream, 2); continue; } else if ((strncmp(cmd, "add", CMDLEN)) == 0) { action = PS_CREATE; } else if ((strncmp(cmd, "del2", CMDLEN)) == 0) { action = PS_DELETE; aw = -1; } else if ((strncmp(cmd, "del", CMDLEN)) == 0) { action = PS_DELETE; } else if ((strncmp(cmd, "flush", CMDLEN)) == 0) { action = PS_FLUSH; } else { fprintf(stream, "102 Invalid command \"%s\"\n", cmd); fflush(stream); continue; } /* * get { "addr" ip_address } */ if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } if (strncmp(keyword, "addr", CMDLEN) != 0) { fprintf(stream, "104 Unexpected keyword \"%s\"\n", keyword); fflush(stream); continue; } if ((addrstr = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } addrlen = sizeof(addr); if (ipfromstring(addrstr, SA(&addr), &addrlen, AF_UNSPEC) != 1) { fprintf(stream, "107 Invalid IP address\n"); fflush(stream); continue; } if (action == PS_FLUSH) { from = NULL; rcpt = NULL; date = 0; goto eol; } /* * get { "from" email_address } */ if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } if (strncmp(keyword, "from", CMDLEN) != 0) { fprintf(stream, "104 Unexpected keyword \"%s\"\n", keyword); fflush(stream); continue; } if ((from = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } (void)strncpy_rmsp(from_clean, from, ADDRLEN); from = from_clean; /* * get { "rcpt" email_address } */ if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } if (strncmp(keyword, "rcpt", CMDLEN) != 0) { fprintf(stream, "104 Unexpected keyword \"%s\"\n", keyword); fflush(stream); continue; } if ((rcpt = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } (void)strncpy_rmsp(rcpt_clean, rcpt, ADDRLEN); rcpt = rcpt_clean; /* * get { "date" valid_date } */ if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } if (strncmp(keyword, "date", CMDLEN) != 0) { fprintf(stream, "104 Unexpected keyword \"%s\"\n", keyword); fflush(stream); continue; } if ((datestr = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } date = atoi(datestr); if (aw == -1) { /* * get { "aw" valid_date } */ if ((keyword = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } if (strncmp(keyword, "aw", CMDLEN) != 0) { fprintf(stream, "104 Unexpected keyword \"%s\"\n", keyword); fflush(stream); continue; } if ((awstr = strtok_r(NULL, sep, &cookie)) == NULL) { fprintf(stream, "103 Incomplete command\n"); fflush(stream); continue; } aw = atoi(awstr); } /* * Check nothing remains */ eol: if ((keyword = strtok_r(NULL, sep, &cookie)) != NULL) { fprintf(stream, "104 Unexpected keyword \"%s\"\n", keyword); fflush(stream); continue; } fprintf(stream, "201 All right, I'll do that\n"); fflush(stream); if (action == PS_CREATE) { PENDING_WRLOCK; /* delay = -1 means unused: we supply the date */ pending_get(SA(&addr), addrlen, from, rcpt, date); PENDING_UNLOCK; } if (action == PS_DELETE) { pending_del(SA(&addr), addrlen, from, rcpt, date); autowhite_add(SA(&addr), addrlen, from, rcpt, &aw, "(mxsync)"); } if (action == PS_FLUSH) { pending_del_addr(SA(&addr), addrlen, NULL, 0); autowhite_del_addr(SA(&addr), addrlen); } /* Flush modifications to disk */ dump_flush(); } fprintf(stream, "202 Good bye\n"); fclose(stream); return; } static void sync_vers(stream, vers) FILE *stream; int vers; { if (vers <= SYNC_PROTO_CURRENT) { fprintf(stream, "%d Yes, I speak version %d, what do you think?\n", 800 + vers, vers); } else { fprintf(stream, "108 Invalid vers%d command\n", vers); } fflush(stream); return; } void sync_help(stream) FILE *stream; { fprintf(stream, "203 Help? Sure, we have help here:\n"); fprintf(stream, "203 \n"); fprintf(stream, "203 Available commands are:\n"); fprintf(stream, "203 help -- displays this message\n"); fprintf(stream, "203 quit -- terminate connexion\n"); fprintf(stream, "203 vers2 -- speak version 2 protocol\n"); fprintf(stream, "203 vers3 -- speak version 3 protocol\n"); fprintf(stream, "203 add addr from rcpt date