/*
* chattersocket.{cc,hh} -- element echoes chatter to TCP/IP or Unix-domain
* sockets
* Eddie Kohler
*
* Copyright (c) 2000 Massachusetts Institute of Technology
* Copyright (c) 2001 International Computer Science Institute
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, subject to the conditions
* listed in the Click LICENSE file. These conditions include: you must
* preserve this copyright notice, and you cannot mention the copyright
* holders in advertising related to the Software without their permission.
* The Software is provided WITHOUT ANY WARRANTY, EXPRESS OR IMPLIED. This
* notice is a summary of the Click LICENSE file; the license in that file is
* legally binding. */
#include <click/config.h>
#include "chattersocket.hh"
#include <click/confparse.hh>
#include <click/error.hh>
#include <click/router.hh>
#include <click/straccum.hh>
#include <clicknet/tcp.h> /* for SEQ_LT, etc. */
#include <unistd.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <arpa/inet.h>
#include <fcntl.h>
CLICK_DECLS
const char ChatterSocket::protocol_version[] = "1.0";
struct ChatterSocketErrorHandler : public ErrorVeneer {
Vector<ChatterSocket *> _chatter_sockets;
public:
ChatterSocketErrorHandler(ErrorHandler *errh) : ErrorVeneer(errh) { }
ErrorHandler *base_errh() const { return _errh; }
int nchatter_sockets() const { return _chatter_sockets.size(); }
void add_chatter_socket(ChatterSocket *);
void remove_chatter_socket(ChatterSocket *);
void handle_text(Seriousness, const String &);
};
void
ChatterSocketErrorHandler::add_chatter_socket(ChatterSocket *cs)
{
for (int i = 0; i < _chatter_sockets.size(); i++)
if (_chatter_sockets[i] == cs)
return;
_chatter_sockets.push_back(cs);
}
void
ChatterSocketErrorHandler::remove_chatter_socket(ChatterSocket *cs)
{
for (int i = 0; i < _chatter_sockets.size(); i++)
if (_chatter_sockets[i] == cs) {
_chatter_sockets[i] = _chatter_sockets.back();
_chatter_sockets.pop_back();
return;
}
}
void
ChatterSocketErrorHandler::handle_text(Seriousness seriousness, const String &m)
{
String actual_m = m;
if (m.length() > 0 && m.back() != '\n')
actual_m += '\n';
_errh->handle_text(seriousness, actual_m);
for (int i = 0; i < _chatter_sockets.size(); i++)
_chatter_sockets[i]->handle_text(seriousness, actual_m);
}
static ChatterSocketErrorHandler *chatter_socket_errh;
static ErrorHandler *base_default_errh;
ChatterSocket::ChatterSocket()
: _socket_fd(-1), _channel("default"), _retry_timer(0)
{
}
ChatterSocket::~ChatterSocket()
{
}
int
ChatterSocket::configure(Vector<String> &conf, ErrorHandler *errh)
{
String socktype;
if (cp_va_parse(conf, this, errh,
cpString, "type of socket (`TCP' or `UNIX')", &socktype,
cpIgnoreRest, cpEnd) < 0)
return -1;
// remove keyword arguments
bool quiet_channel = true, greeting = true, retry_warnings = true;
_retries = 0;
if (cp_va_parse_remove_keywords(conf, 2, this, errh,
"CHANNEL", cpWord, "chatter channel", &_channel,
"QUIET_CHANNEL", cpElement, "channel is quiet?", &quiet_channel,
"GREETING", cpBool, "greet connectors?", &greeting,
"RETRIES", cpInteger, "number of retries", &_retries,
"RETRY_WARNINGS", cpBool, "warn on unsuccessful socket attempt?", &retry_warnings,
cpEnd) < 0)
return -1;
_greeting = greeting;
_retry_warnings = retry_warnings;
socktype = socktype.upper();
if (socktype == "TCP") {
_tcp_socket = true;
unsigned short portno;
if (cp_va_parse(conf, this, errh,
cpIgnore, cpUnsignedShort, "port number", &portno, cpEnd) < 0)
return -1;
_unix_pathname = String(portno);
} else if (socktype == "UNIX") {
_tcp_socket = false;
if (cp_va_parse(conf, this, errh,
cpIgnore, cpString, "filename", &_unix_pathname, cpEnd) < 0)
return -1;
if (_unix_pathname.length() >= (int)sizeof(((struct sockaddr_un *)0)->sun_path))
return errh->error("filename too long");
} else
return errh->error("unknown socket type `%s'", socktype.c_str());
// Create channel now, so that other configure() methods will get it.
ChatterSocketErrorHandler *cserrh;
if (_channel == "default" && chatter_socket_errh)
cserrh = chatter_socket_errh;
else if (_channel == "default") {
base_default_errh = ErrorHandler::default_handler();
chatter_socket_errh = new ChatterSocketErrorHandler(base_default_errh);
ErrorHandler::set_default_handler(chatter_socket_errh);
cserrh = chatter_socket_errh;
} else if (void *v = router()->attachment("ChatterChannel." + _channel))
cserrh = (ChatterSocketErrorHandler *)v;
else {
ErrorHandler *base = (quiet_channel ? ErrorHandler::silent_handler() : base_default_errh);
if (!base) base = ErrorHandler::default_handler();
cserrh = new ChatterSocketErrorHandler(base);
router()->set_attachment("ChatterChannel." + _channel, cserrh);
}
// install ChatterSocketErrorHandler
cserrh->add_chatter_socket(this);
return 0;
}
int
ChatterSocket::initialize_socket_error(ErrorHandler *errh, const char *syscall)
{
int e = errno; // preserve errno
if (_socket_fd >= 0) {
close(_socket_fd);
_socket_fd = -1;
}
if (_retries >= 0) {
if (_retry_warnings)
errh->warning("%s: %s (%d %s left)", syscall, strerror(e), _retries + 1, (_retries == 0 ? "try" : "tries"));
return -EINVAL;
} else
return errh->error("%s: %s", syscall, strerror(e));
}
int
ChatterSocket::initialize_socket(ErrorHandler *errh)
{
_retries--;
// open socket, set options, bind to address
if (_tcp_socket) {
_socket_fd = socket(PF_INET, SOCK_STREAM, 0);
if (_socket_fd < 0)
return initialize_socket_error(errh, "socket");
int sockopt = 1;
if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sockopt, sizeof(sockopt)) < 0)
errh->warning("setsockopt: %s", strerror(errno));
// bind to port
int portno;
(void) cp_integer(_unix_pathname, &portno);
struct sockaddr_in sa;
sa.sin_family = AF_INET;
sa.sin_port = htons(portno);
sa.sin_addr = inet_makeaddr(0, 0);
if (bind(_socket_fd, (struct sockaddr *)&sa, sizeof(sa)) < 0)
return initialize_socket_error(errh, "bind");
} else {
_socket_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (_socket_fd < 0)
return initialize_socket_error(errh, "socket");
// bind to port
struct sockaddr_un sa;
sa.sun_family = AF_UNIX;
memcpy(sa.sun_path, _unix_pathname.c_str(), _unix_pathname.length() + 1);
if (bind(_socket_fd, (struct sockaddr *)&sa, sizeof(sa)) < 0)
return initialize_socket_error(errh, "bind");
}
// start listening
if (listen(_socket_fd, 2) < 0)
return initialize_socket_error(errh, "listen");
// nonblocking I/O and close-on-exec for the socket
fcntl(_socket_fd, F_SETFL, O_NONBLOCK);
fcntl(_socket_fd, F_SETFD, FD_CLOEXEC);
add_select(_socket_fd, SELECT_READ);
return 0;
}
void
ChatterSocket::retry_hook(Timer *t, void *thunk)
{
ChatterSocket *cs = (ChatterSocket *)thunk;
if (cs->_socket_fd >= 0)
/* nada */;
else if (cs->initialize_socket(ErrorHandler::default_handler()) >= 0)
/* nada */;
else if (cs->_retries >= 0)
t->reschedule_after_sec(1);
else
cs->router()->please_stop_driver();
}
int
ChatterSocket::initialize(ErrorHandler *errh)
{
_max_pos = 0;
_live_fds = 0;
if (initialize_socket(errh) >= 0)
return 0;
else if (_retries >= 0) {
_retry_timer = new Timer(retry_hook, this);
_retry_timer->initialize(this);
_retry_timer->schedule_after_sec(1);
return 0;
} else
return -1;
}
void
ChatterSocket::take_state(Element *e, ErrorHandler *errh)
{
ChatterSocket *cs = (ChatterSocket *)e->cast("ChatterSocket");
if (!cs)
return;
if (_socket_fd >= 0) {
errh->error("already initialized, can't take state");
return;
} else if (_tcp_socket != cs->_tcp_socket
|| _unix_pathname != cs->_unix_pathname
|| _channel != cs->_channel) {
errh->error("incompatible ChatterSockets");
return;
}
_socket_fd = cs->_socket_fd;
cs->_socket_fd = -1;
_messages.swap(cs->_messages);
_message_pos.swap(cs->_message_pos);
_max_pos = cs->_max_pos;
_fd_alive.swap(cs->_fd_alive);
_fd_pos.swap(cs->_fd_pos);
_live_fds = cs->_live_fds;
cs->_live_fds = 0;
if (_socket_fd >= 0)
add_select(_socket_fd, SELECT_READ);
for (int i = 0; i < _fd_alive.size(); i++)
if (_fd_alive[i])
add_select(i, SELECT_WRITE);
}
static void
remove_chatter_channel(ChatterSocketErrorHandler *&cserrh, ChatterSocket *cs)
{
if (cserrh) {
cserrh->remove_chatter_socket(cs);
if (!cserrh->nchatter_sockets()) {
if (cserrh == chatter_socket_errh)
ErrorHandler::set_default_handler(base_default_errh);
delete cserrh;
cserrh = 0;
}
}
}
void
ChatterSocket::cleanup(CleanupStage)
{
if (_socket_fd >= 0) {
// shut down the listening socket in case we forked
#ifdef SHUT_RDWR
shutdown(_socket_fd, SHUT_RDWR);
#else
shutdown(_socket_fd, 2);
#endif
close(_socket_fd);
if (!_tcp_socket)
unlink(_unix_pathname.c_str());
_socket_fd = -1;
}
for (int i = 0; i < _fd_alive.size(); i++)
if (_fd_alive[i]) {
close(i);
_fd_alive[i] = 0;
}
_live_fds = 0;
if (_retry_timer) {
delete _retry_timer;
_retry_timer = 0;
}
// unhook from chatter socket error handler
if (_channel == "default")
remove_chatter_channel(chatter_socket_errh, this);
else
remove_chatter_channel
((ChatterSocketErrorHandler *&)(router()->force_attachment("ChatterChannel." + _channel)), this);
}
int
ChatterSocket::flush(int fd)
{
// check file descriptor
if (fd >= _fd_alive.size() || !_fd_alive[fd])
return _messages.size();
// check if all data written
if (_fd_pos[fd] == _max_pos)
return _messages.size();
// find first useful message (binary search)
uint32_t fd_pos = _fd_pos[fd];
int l = 0, r = _messages.size() - 1, useful_message = -1;
while (l <= r) {
int m = (l + r) >> 1;
if (SEQ_LT(fd_pos, _message_pos[m]))
r = m - 1;
else if (SEQ_GEQ(fd_pos, _message_pos[m] + _messages[m].length()))
l = m + 1;
else {
useful_message = m;
break;
}
}
// if messages found, write data until blocked or closed
if (useful_message >= 0) {
while (useful_message < _message_pos.size()) {
const String &m = _messages[useful_message];
int mpos = _message_pos[useful_message];
const char *data = m.data() + (fd_pos - mpos);
int len = m.length() - (fd_pos - mpos);
int w = write(fd, data, len);
if (w < 0 && errno != EINTR) {
if (errno != EAGAIN) // drop connection on error, except WOULDBLOCK
useful_message = -1;
break;
} else if (w > 0)
fd_pos += w;
if (SEQ_GEQ(fd_pos, mpos + m.length()))
useful_message++;
}
}
// store changed fd_pos
_fd_pos[fd] = fd_pos;
// close out on error, or if socket falls too far behind
if (useful_message < 0 || SEQ_LT(fd_pos, _max_pos - MAX_BACKLOG)) {
close(fd);
remove_select(fd, SELECT_WRITE);
_fd_alive[fd] = 0;
_live_fds--;
} else if (fd_pos == _max_pos)
remove_select(fd, SELECT_WRITE);
else
add_select(fd, SELECT_WRITE);
return useful_message;
}
void
ChatterSocket::flush()
{
int min_useful_message = _messages.size();
if (min_useful_message)
for (int i = 0; i < _fd_alive.size(); i++)
if (_fd_alive[i] >= 0) {
int m = flush(i);
if (m < min_useful_message)
min_useful_message = m;
}
// cull old messages
if (min_useful_message >= 10) {
_messages.erase(_messages.begin(), _messages.begin() + min_useful_message);
_message_pos.erase(_message_pos.begin(), _message_pos.begin() + min_useful_message);
}
}
void
ChatterSocket::selected(int fd)
{
if (fd == _socket_fd) {
union { struct sockaddr_in in; struct sockaddr_un un; } sa;
#if HAVE_ACCEPT_SOCKLEN_T
socklen_t sa_len;
#else
int sa_len;
#endif
sa_len = sizeof(sa);
int new_fd = accept(_socket_fd, (struct sockaddr *)&sa, &sa_len);
if (new_fd < 0) {
if (errno != EAGAIN)
click_chatter("%s: accept: %s", declaration().c_str(), strerror(errno));
return;
}
fcntl(new_fd, F_SETFL, O_NONBLOCK);
fcntl(new_fd, F_SETFD, FD_CLOEXEC);
while (new_fd >= _fd_alive.size()) {
_fd_alive.push_back(0);
_fd_pos.push_back(0);
}
_fd_alive[new_fd] = 1;
_fd_pos[new_fd] = _max_pos;
_live_fds++;
fd = new_fd;
// no need to SELECT_WRITE; flush(fd) will do it if required
if (_greeting) {
// XXX - assume that this write will succeed
String s = String("Click::ChatterSocket/") + protocol_version + "\r\n";
int w = write(fd, s.data(), s.length());
if (w != s.length())
click_chatter("%s fd %d: unable to write greeting!", declaration().c_str(), fd);
}
}
flush(fd);
}
CLICK_ENDDECLS
ELEMENT_REQUIRES(userlevel)
EXPORT_ELEMENT(ChatterSocket)
syntax highlighted by Code2HTML, v. 0.9.1