/* * Copyright (c) 2002, Stefan Farfeleder * $Id: network.cc,v 1.5 2002/09/10 22:29:38 stefan Exp $ */ #include #include #include #include #include #include #include #include "config.h" #define _POSIX_C_SOURCE 199506L #define _XOPEN_SOURCE_EXTENDED #if defined(HAVE_SYS_TYPES_H) #include #endif #if defined(HAVE_SYS_SOCKET_H) #include #endif #if defined(HAVE_SYS_SELECT_H) #include #endif #if defined(HAVE_SYS_TIME_H) #include #endif #if defined(HAVE_ARPA_INET_H) #include #endif #if defined(HAVE_FCNTL_H) #include #endif #if defined(HAVE_NETDB_H) #include #endif #if defined(HAVE_NETINET_IN_H) #include #endif #if defined(HAVE_NETINET_TCP_H) #include #endif #if defined(HAVE_POLL_H) #include #endif #if defined(HAVE_UNISTD_H) #include #endif #if defined(_WIN32) #include #define close closesocket #define EWOULDBLOCK WSAEWOULDBLOCK #define ECONNABORTED WSAECONNABORTED #define SET_ERRNO do { errno = WSAGetLastError(); } while (0) #else #define SET_ERRNO #endif #include "addrinfo.h" #include "exception.h" #include "missing.h" #include "network.h" using std::string; using std::vector; using namespace JFK; const int JFK::TO_ALL = -1; #if defined(INET6) const int MAXSOCKADDR = sizeof(sockaddr_in6); #else const int MAXSOCKADDR = sizeof(sockaddr_in); #endif /* * We really need non-blocking sockets, so bomb out if we can't have them. */ static void set_socket_nonblocking(int sock) { int val; #if defined(HAVE_FCNTL) val = fcntl(sock, F_GETFL); if (val == -1) throw JFK::exception_e("fcntl"); if (fcntl(sock, F_SETFL, val | O_NONBLOCK) == -1) throw exception_e("fcntl"); #elif defined(_WIN32) unsigned long ul_on = 1; val = ioctlsocket(sock, FIONBIO, &ul_on); if (val == -1) throw JFK::exception("ioctlsocket"); #else #error "can't set the socket to non-blocking" #endif } static int setsockopt_wrapper(int sock, int level, int optname, int value) { #if defined(HAVE_SETSOCKOPT) const int v = value; return setsockopt(sock, level, optname, &v, sizeof v); #elif defined(_WIN32) const char v = value; return setsockopt(sock, level, optname, &v, sizeof v); #else return -1; #endif } /* * Disabling Nagle's algorithm should reduce network lag, but we don't require * it. */ static void disable_nagle_algorithm(int sock) { #if defined(IPPROTO_TCP) && defined(TCP_NODELAY) (void)setsockopt_wrapper(sock, IPPROTO_TCP, TCP_NODELAY, 1); #endif } static void network_init() { #if defined(_WIN32) unsigned long version = MAKEWORD(2, 2); WSADATA wsadata; if (WSAStartup(version, &wsadata) != 0) throw JFK::exception("WSAStartup"); #endif #ifdef SIGPIPE /* ignore SIGPIPE */ if (std::signal(SIGPIPE, SIG_IGN) == SIG_ERR) throw JFK::exception_e("signal"); #endif } /* * Connect to the server with the specified service, then set the socket to * non-blocking mode. */ network_client::network_client(const string& servername, const string& service) { addrinfo hints; addrinfo* res; addrinfo* lres; int val; network_init(); std::memset(&hints, 0, sizeof hints); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; if ((val = getaddrinfo(servername.c_str(), service.c_str(), &hints, &res)) != 0) throw exception("getaddrinfo: " + string(gai_strerror(val))); /* loop through each entry and try to connect */ for (lres = res; lres != NULL; lres = lres->ai_next) { server.fd = socket(lres->ai_family, lres->ai_socktype, lres->ai_protocol); if (server.fd < 0) continue; if (connect(server.fd, lres->ai_addr, lres->ai_addrlen) == 0) break; /* found */ if (close(server.fd) != 0) throw exception_e("close"); } freeaddrinfo(res); if (lres == NULL) /* errno could be wrong, so probably this should only be * exception(...) */ throw exception_e("cannot connect to " + servername + '.' + service); set_socket_nonblocking(server.fd); disable_nagle_algorithm(server.fd); } network_client::~network_client() { #if defined(_WIN32) WSACleanup(); #endif } /* * Send 'msg' to the server. */ void network_client::send(const string& msg) { server.writeq.push(msg + '\n'); } /* * Get a line from the server and return true; if no line is ready false is * returned. */ bool network_client::receive(string* msg) { if (server.readq.empty()) return false; *msg = server.readq.front(); server.readq.pop(); return true; } /* * Try to write lines stored in writeq to the server and to read lines from * the server into readq. */ void network_client::dispatch() { server.flush_writeq(); server.fill_readq(); if (server.lost) throw exception("lost server"); } /* * Create a server listening on service 'service' and accepting connections. */ network_server::network_server(const string& service) #if defined(HAVE_POLL) : fds(NULL), fd_alloc(0) #endif { addrinfo hints; addrinfo* res; addrinfo* lres; int val; network_init(); std::memset(&hints, 0, sizeof hints); hints.ai_family = AF_UNSPEC; hints.ai_flags = AI_PASSIVE; hints.ai_socktype = SOCK_STREAM; if ((val = getaddrinfo(NULL, service.c_str(), &hints, &res)) != 0) throw exception("getaddrinfo: " + string(gai_strerror(val))); /* loop through each entry and try to bind */ for (lres = res; lres != NULL; lres = lres->ai_next) { listenfd = socket(lres->ai_family, lres->ai_socktype, lres->ai_protocol); if (listenfd < 0) continue; #if defined(SOL_SOCKET) && defined(SO_REUSEADDR) (void)setsockopt_wrapper(listenfd, SOL_SOCKET, SO_REUSEADDR, 1); #endif if (bind(listenfd, lres->ai_addr, lres->ai_addrlen) == 0) break; /* found */ if (close(listenfd) != 0) throw exception_e("close"); } freeaddrinfo(res); if (lres == NULL) throw exception_e("cannot listen on " + service); if (listen(listenfd, SOMAXCONN) != 0) throw exception_e("listen"); set_socket_nonblocking(listenfd); sa = (sockaddr*)std::malloc(MAXSOCKADDR); if (sa == NULL) throw std::bad_alloc(); } /* * Free allocated memory. */ network_server::~network_server() { #if defined(HAVE_POLL) std::free(fds); #endif std::free(sa); #if defined(_WIN32) WSACleanup(); #endif } /* * Return true if a new client has connected to the server. The ip number is * stored into 'host'. An id number for the client is assigned with which it * can be referred to in further requests. */ bool network_server::new_client(string* hostname, int* id) { char buf[NI_MAXHOST]; socklen_t len = MAXSOCKADDR; int clientfd; int offs; errno = 0; #if !defined(HAVE_POLL) /* only FD_SETSIZE fds in select(), can't accept() more */ if (client.size() == FD_SETSIZE) return false; #endif do { clientfd = accept(listenfd, sa, &len); SET_ERRNO; } while (clientfd == -1 && errno == EINTR); if (clientfd == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ECONNABORTED) { return false; /* no new client yet */ } throw exception_e("accept"); } if (sa->sa_family == AF_INET) offs = offsetof(sockaddr_in, sin_addr); #if defined(INET6) else if (sa->sa_family == AF_INET6) offs = offsetof(sockaddr_in6, sin6_addr); #endif else throw exception("unknown sa_family"); if (inet_ntop(sa->sa_family, (char*)sa + offs, buf, sizeof buf) == NULL) throw exception_e("inet_ntop"); set_socket_nonblocking(clientfd); disable_nagle_algorithm(clientfd); *hostname = string(buf); *id = clientfd; client.push_back(clientfd); #if defined(HAVE_POLL) if (client.size() > fd_alloc) { const size_t MIN_POLLFD = 10; /* allocate more memory for the pollfd array */ fd_alloc = std::max(MIN_POLLFD, 3 * fd_alloc / 2); pollfd* tmp = (pollfd*)std::realloc(fds, fd_alloc * sizeof *fds); if (tmp == NULL) throw std::bad_alloc(); fds = tmp; } #endif return true; } /* * Return true if a client has disconnected and store its id into 'id'. */ bool network_server::lost_client(int* id) { for (size_t i = 0; i < client.size(); i++) { if (client[i].lost) { *id = client[i].fd; remove_client(*id); return true; } } return false; } /* * Cut the connection to the client. */ void network_server::remove_client(int id) { client_iter i = find_id(client, id); if (i == client.end()) throw exception("id nonexistent"); if (close(i->fd) != 0) throw exception_e("close"); *i = client.back(); client.resize(client.size() - 1); } void network_server::enable_client(int id) { client_iter i = find_id(client, id); if (i == client.end()) throw exception("id nonexistent"); i->enabled = true; } void network_server::disable_client(int id) { client_iter i = find_id(client, id); if (i == client.end()) throw exception("id nonexistent"); i->enabled = false; } /* * Push 'msg' onto the stack of the client with the id 'to' (or to all's if * to == TO_ALL). */ void network_server::send(const string& msg, int to) { for (size_t i = 0; i < client.size(); i++) { if (client[i].fd == to || (to == TO_ALL && client[i].enabled)) { client[i].writeq.push(msg + '\n'); if (to != TO_ALL) return; } } if (to != TO_ALL) throw exception("id nonexistent"); } /* * If a message arrived it is stored to 'msg' and true is returned. The * sender-id is stored into 'from'. */ bool network_server::receive(string* msg, int from) { client_iter i = find_id(client, from); if (i == client.end()) throw exception("id nonexistent"); if (i->readq.empty()) return false; else { *msg = i->readq.front(); i->readq.pop(); return true; } } /* * Try to send each writeq to its client and fill the readq with incoming * data. */ void network_server::dispatch() { int nfds = 0; /* number of elements in fds */ int nused; /* number of fds read-/writable */ #if !defined(HAVE_POLL) && !defined(HAVE_SELECT) && !defined(_WIN32) #error "neither poll() nor select() available!" #endif #if !defined(HAVE_POLL) fd_set readset; fd_set writeset; FD_ZERO(&readset); FD_ZERO(&writeset); #endif /* fill the fds array/fd_set */ for (size_t i = 0; i < client.size(); i++) { host& c = client[i]; if (c.lost) continue; #if defined(HAVE_POLL) fds[nfds].fd = c.fd; fds[nfds].events = POLLIN; if (!c.writeq.empty()) fds[nfds].events |= POLLOUT; nfds++; #else FD_SET(c.fd, &readset); if (!c.writeq.empty()) FD_SET(c.fd, &writeset); if (c.fd > nfds) nfds = c.fd; #endif } #if defined(HAVE_POLL) nused = poll(fds, nfds, 0); if (nused == -1) throw exception_e("poll"); /* i is the index in the fds array, j in client */ for (size_t i = 0, j = 0; nused > 0; i++) { /* skip over clients not in the fds array */ while (j < client.size() && client[j].fd != fds[i].fd) j++; assert(j < client.size()); if (fds[i].revents == 0) continue; nused--; host& c = client[j]; if (fds[i].revents & POLLERR || fds[i].revents & POLLHUP) { c.lost = true; continue; } if (fds[i].revents & POLLOUT) { c.flush_writeq(); } if (fds[i].revents & POLLIN) { c.fill_readq(); } } #else timeval tv; tv.tv_sec = tv.tv_usec = 0; nused = 0; if (nfds > 0) { nused = select(nfds + 1, &readset, &writeset, NULL, &tv); SET_ERRNO; } if (nused == -1) throw exception_e("select"); for (int i = 0; i <= nfds && nused > 0; i++) { client_iter it; if (FD_ISSET(i, &readset)) { it = find_id(client, i); assert(it != client.end()); it->fill_readq(); nused--; } if (FD_ISSET(i, &writeset)) { it = find_id(client, i); assert(it != client.end()); it->flush_writeq(); nused--; } } assert(nused == 0); #endif } /* * Search for host with id 'id'. Returns vh.end() if not found. */ network_server::client_iter network_server::find_id(vector& vh, int id) { client_iter ret; for (ret = vh.begin(); ret != vh.end(); ret++) if (ret->fd == id) break; return ret; } /* * Send as much data as possible from writeq. */ void host::flush_writeq() { errno = 0; while (!writeq.empty()) { ssize_t n = ::send(fd, writeq.front().data() + written_chars, writeq.front().length() - written_chars, 0); SET_ERRNO; if (n <= 0) { if (errno == EINTR) continue; /* try again */ if (errno != EWOULDBLOCK && errno != EAGAIN) { lost = true; } break; } written_chars += n; if (written_chars == writeq.front().length()) { /* All of the line has been written, remove it from writeq. */ writeq.pop(); written_chars = 0; } } } /* * Read as much data as possible into readq. */ void host::fill_readq() { char buf[2048]; errno = 0; for (;;) { ssize_t n = recv(fd, buf, sizeof buf, 0); SET_ERRNO; if (n <= 0) { if (errno == EINTR) continue; if (errno != EWOULDBLOCK && errno != EAGAIN) { lost = true; } break; } char* p; /* Points to the next \n. */ char* r; /* Points to the beginning of a line. */ for (r = buf; r < buf + n && (p = (char*)std::memchr(r, '\n', buf + n - r)) != NULL; r = p + 1) { if (incomplete.length() > 0) { /* incomplete + everything to the next \n go into readq. */ readq.push(incomplete); /* Dont store the '\n'. */ readq.back().append(r, p - r); incomplete = ""; } else { /* Don't store the '\n'. */ readq.push(string(r, p - r)); } } /* Everything after the last \n goes into incomplete. */ incomplete.append(r, buf + n - r); } }