/*===========================================================================*
* *
* smtsock.c - *
* *
* Copyright (c) 1991-2003 iMatix Corporation *
* *
* ------------------ GPL Licensed Source Code ------------------ *
* iMatix makes this software available under the GNU General *
* Public License (GPL) license for open source projects. For *
* details of the GPL license please see www.gnu.org or read the *
* file license.gpl provided in this package. *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License as *
* published by the Free Software Foundation; either version 2 of *
* the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public *
* License along with this program in the file 'license.gpl'; if *
* not, write to the Free Software Foundation, Inc., 59 Temple *
* Place - Suite 330, Boston, MA 02111-1307, USA. *
* *
* You can also license this software under iMatix's General Terms *
* of Business (GTB) for commercial projects. If you have not *
* explicitly licensed this software under the iMatix GTB you may *
* only use it under the terms of the GNU General Public License. *
* *
* For more information, send an email to info@imatix.com. *
* -------------------------------------------------------------- *
*===========================================================================*/
/*
Synopsis: Waits, reads, writes socket data.
*/
#include "smtdefn.h" /* SMT definitions */
/*- Definitions -------------------------------------------------------------*/
#define AGENT_NAME SMT_SOCKET /* Our public name */
#define SINGLE_THREADED TRUE /* Single-threaded agent */
#define WRITE_TIMEOUT 10 /* Default write timeout */
typedef struct _SOCKREQ { /* Request descriptor */
struct _SOCKREQ /* */
*next, *prev; /* Doubly-linked list */
QID reply_to; /* Who sent the request event */
sock_t input; /* Socket for input */
sock_t output; /* Socket for output */
byte *buffer; /* Buffer for i/o, or NULL */
qbyte max_size; /* Maximum size of buffer */
qbyte cur_size; /* Current size of buffer */
qbyte min_size; /* Minimum data to process */
dbyte timeout; /* Expiry time in seconds */
time_t expires; /* Expiry time, or 0 */
qbyte tag; /* User-defined request tag */
Bool repeat; /* Repeated request? */
Bool huge_block; /* Huge blocks? */
} SOCKREQ;
/*- Function prototypes -----------------------------------------------------*/
static SOCKREQ *request_create (THREAD *thread,
dbyte timeout, sock_t handle, qbyte tag);
static sock_t request_handle (SOCKREQ *request);
static void request_destroy (SOCKREQ *request);
static void handle_partial_io (SOCKREQ *request, int bytes_done);
static void reply_error (QID *qid, sock_t handle, char *message,
qbyte tag);
static void reply_normal (SOCKREQ *request, char *event_name);
static void purge_old_request (sock_t input, sock_t output);
/*- Global variables used in this source file only --------------------------*/
static Bool
had_activity = FALSE,
trace_flag = FALSE; /* Trace socket activity? */
static NODE
requests; /* Request list header */
static fd_set
read_set, /* Sockets to check for input */
write_set, /* Sockets to check for output */
error_set; /* Sockets to check for errors */
static QID
operq; /* Operator console event queue */
static SOCKREQ
*request, /* Pointer to request (in list) */
*active_request; /* Request we're processing */
static byte
msg_body [LINE_MAX]; /* Message sent to requestors */
static int
msg_size; /* Size of formatted msg_body */
static DESCR /* Descriptor for exdr_write */
msg = { LINE_MAX, msg_body };
#include "smtsock.d" /* Include dialog data */
/******************** INITIALISE AGENT - ENTRY POINT *********************/
/* ---------------------------------------------------------------------[<]-
Function: smtsock_init
Synopsis: Initialises the SMT socket agent. Returns 0 if initialised
okay, -1 if there was an error. The socket agent manages all sockets
(TCP and UPD) used by an SMT application. Creates an unnamed thread
automatically: send events to that thread. Initialises the sflsock
socket interface automatically. Supports these public methods:
READ Read a specified amount of input data (use SMT_SOCK_READ).
WRITE Write a specified amount of output data (use SMT_SOCK_WRITE).
READR Read input data, repeatedly (use SMT_SOCK_READ).
READH As for READ, but for blocks > 64k (use SMT_SOCK_READH).
WRITEH As for WRITE, but for blocks > 64k (use SMT_SOCK_WRITEH).
READRH As for READR, but for blocks > 64k (use SMT_SOCK_READH).
INPUT Wait for any input ready on socket (use SMT_SOCK_INPUT).
INPUTR Wait for any input, repeatedly (use SMT_SOCK_INPUT).
OUTPUT Wait for any output ready on socket (use SMT_SOCK_OUTPUT).
CONNECT Make socket connection to host & port (use SMT_SOCK_CONNECT).
FLUSH Delete all requests for specified socket (use SMT_SOCK_FLUSH).
Sends errors to the SMTOPER agent; see doc for reply events.
---------------------------------------------------------------------[>]-*/
int
smtsock_init (void)
{
AGENT *agent; /* Handle for our agent */
THREAD *thread; /* Handle to console thread */
# include "smtsock.i" /* Include dialog interpreter */
/* We give this agent a low priority, so that it will only run after */
/* all other threads. This is important, since it blocks on select(). */
agent-> priority = SMT_PRIORITY_LOW;
/* Method name Event value Priority */
/* Shutdown event comes from Kernel */
method_declare (agent, "SHUTDOWN", shutdown_event, SMT_PRIORITY_MAX);
/* Public methods supported by this agent */
method_declare (agent, "READ", read_event, 0);
method_declare (agent, "READR", readr_event, 0);
method_declare (agent, "READH", readh_event, 0);
method_declare (agent, "READRH", readrh_event, 0);
method_declare (agent, "WRITE", write_event, 0);
method_declare (agent, "WRITEH", writeh_event, 0);
method_declare (agent, "INPUT", input_event, 0);
method_declare (agent, "INPUTR", inputr_event, 0);
method_declare (agent, "OUTPUT", output_event, 0);
method_declare (agent, "CONNECT", connect_event, 0);
method_declare (agent, "FLUSH", flush_event, 0);
/* Private method used to cycle on select() call */
method_declare (agent, "_TIMEOUT", timeout_event, 0);
/* Ensure that operator console is running, else start it up */
if (agent_lookup (SMT_OPERATOR) == NULL)
smtoper_init ();
if ((thread = thread_lookup (SMT_OPERATOR, "")) != NULL)
operq = thread-> queue-> qid;
else
return (-1);
/* Initialise the socket interface and register sock_term() */
if (sock_init () == 0)
smt_atexit ((function) sock_term);
else
{
sendfmt (&operq, "ERROR",
"smtsock: could not initialise socket interface");
sendfmt (&operq, "ERROR",
"smtsock: %s", connect_errlist [connect_error ()]);
return (-1);
}
ip_nonblock = TRUE; /* Want nonblocking sockets */
/* Create initial, unnamed thread */
thread_create (AGENT_NAME, "");
/* Signal okay to caller that we initialised okay */
return (0);
}
/* ---------------------------------------------------------------------[<]-
Function: smtsock_trace
Synopsis: Enables/disables socket tracing: to enable, call with TRUE as
argument; to disable call with FALSE as argument. Socket trace data is
sent to the console.
---------------------------------------------------------------------[>]-*/
void smtsock_trace (Bool trace_value)
{
trace_flag = trace_value;
}
/************************* INITIALISE THE THREAD *************************/
MODULE initialise_the_thread (THREAD *thread)
{
node_reset (&requests); /* Initialise requests list */
the_next_event = ok_event;
}
/*********************** GET NEXT EVENT FROM QUEUE ***********************/
MODULE get_next_event_from_queue (THREAD *thread)
{
AGENT *agent; /* This agent */
QUEUE *queue; /* Thread's event queue */
EVENT *event; /* Event information block */
METHOD *method; /* Method information block */
/* Get next event off queue */
queue = thread-> queue;
agent = queue-> agent;
event = event_iterate (queue, NULL);
if (event)
{
method = method_lookup (agent, event-> name);
if (method == NULL) /* Not a method we accept */
{
event_reject (queue, event);
the_next_event = invalid_event;
}
else
{
if (thread-> event) /* If thread was sitting on an */
{ /* event, release it */
event_destroy (thread-> event);
thread-> event = NULL;
}
/* Get event off queue; it now belongs to the thread */
thread-> event = event_accept (queue, event);
the_next_event = method-> event_number;
}
}
else
the_next_event = empty_event;
}
/************************** CREATE READ REQUEST **************************/
MODULE create_read_request (THREAD *thread)
{
dbyte
timeout, /* Timeout, in seconds, or zero */
read_size, /* Size of data buffer, bytes */
min_size; /* Minimum amount of data to read */
sock_t
handle; /* Socket handle */
qbyte
tag; /* User-defined request tag */
/* Get arguments from message */
exdr_read (thread-> event-> body, SMT_SOCK_READ,
&timeout, &handle, &read_size, &min_size, &tag);
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: READ min=%d max=%d socket=%ld timeout=%d",
min_size, read_size, handle, timeout);
if (read_size == 0 || handle == 0)
reply_error (&thread-> event-> sender, handle,
"Null read request", tag);
else
if ((request = request_create (thread, timeout, handle, tag)) != NULL)
{
purge_old_request (handle, 0);
request-> input = handle;
request-> max_size = read_size;
request-> min_size = min_size? min_size: read_size;
request-> buffer = mem_alloc (read_size);
if (request-> buffer == NULL)
raise_exception (exception_event);
}
}
/* -------------------------------------------------------------------------
* request_create
*
* Creates a new request, and initialises it to empty. If the request
* could not be created, sends an SOCK_ERROR event to the caller, and
* returns null. Otherwise returns the address of the created request.
*/
static SOCKREQ *
request_create (THREAD *thread, dbyte timeout, sock_t handle, qbyte tag)
{
SOCKREQ
*request; /* Request we create */
if ((request = node_create (requests.prev, sizeof (SOCKREQ))) == NULL)
reply_error (&thread-> event-> sender, handle, "Out of memory", tag);
else
{
/* Initialise the request with default values */
request-> reply_to = thread-> event-> sender;
request-> input = 0;
request-> output = 0;
request-> buffer = NULL;
request-> max_size = 0;
request-> cur_size = 0;
request-> min_size = 0;
request-> tag = tag;
request-> repeat = FALSE;
request-> huge_block = FALSE;
request-> timeout = timeout;
/* It's really not correct ANSI C to compute with timevals; this */
/* will just have to do for now. It may break on weird systems. */
request-> expires = timeout? time (NULL) + timeout: 0;
}
had_activity = TRUE;
return (request);
}
/* -------------------------------------------------------------------------
* reply_error
*
* Formats and sends a message containing the socket number and an error
* message.
*/
static void
reply_error (QID *qid, sock_t handle, char *message, qbyte tag)
{
msg_size = exdr_writed (&msg, SMT_SOCK_ERROR, message, handle, tag);
event_send (
qid, /* Send to specified queue */
NULL, /* No queue for reply */
"SOCK_ERROR", /* Name of event to send */
msg_body, /* Event body contents */
msg_size, /* Event body size */
NULL, NULL, NULL, /* No response events */
0); /* No timeout */
}
/* -------------------------------------------------------------------------
* purge_old_request
*
* Removes any existing requests for the specified input or output handles.
* Specify either or both handles, or zero.
*/
static void
purge_old_request (sock_t input, sock_t output)
{
SOCKREQ
*request; /* Find request in list */
for (request = requests.next;
request != (SOCKREQ *) &requests;
request = request-> next)
{
if ((input && request-> input == input)
|| (output && request-> output == output))
{
request = request-> prev;
request_destroy (request-> next);
}
}
}
/* -------------------------------------------------------------------------
* request_destroy
*
* Destroys the specified request.
*/
static void
request_destroy (SOCKREQ *request)
{
/* Free dynamically-allocated fields in the request block, as reqd. */
mem_free (request-> buffer);
node_destroy (request);
}
/*********************** CREATE READ REPEAT REQUEST **********************/
MODULE create_read_repeat_request (THREAD *thread)
{
create_read_request (thread);
if (request)
request-> repeat = TRUE;
}
/************************ CREATE HUGE READ REQUEST ***********************/
MODULE create_huge_read_request (THREAD *thread)
{
dbyte
timeout; /* Timeout, in seconds, or zero */
qbyte
read_size, /* Size of data buffer, bytes */
min_size, /* Minimum amount of data to read */
tag; /* User-defined request tag */
sock_t
handle; /* Socket handle */
/* Get arguments from message */
exdr_read (thread-> event-> body, SMT_SOCK_READH,
&timeout, &handle, &read_size, &min_size, &tag);
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: READH min=%ld max=%ld socket=%ld timeout=%d",
min_size, read_size, handle, timeout);
if (read_size == 0 || handle == 0)
reply_error (&thread-> event-> sender, handle,
"Null read request", tag);
else
if (read_size > UINT_MAX)
reply_error (&thread-> event-> sender, handle,
"Read request too large for memory model", tag);
else
if ((request = request_create (thread, timeout, handle, tag)) != NULL)
{
purge_old_request (handle, 0);
request-> input = handle;
request-> max_size = read_size;
request-> min_size = min_size? min_size: read_size;
request-> buffer = mem_alloc ((size_t) read_size);
request-> huge_block = TRUE;
if (request-> buffer == NULL)
raise_exception (exception_event);
}
}
/******************** CREATE HUGE READ REPEAT REQUEST ********************/
MODULE create_huge_read_repeat_request (THREAD *thread)
{
create_huge_read_request (thread);
if (request)
request-> repeat = TRUE;
}
/************************** CREATE WRITE REQUEST *************************/
MODULE create_write_request (THREAD *thread)
{
dbyte
timeout, /* Timeout, in seconds, or zero */
write_size; /* Amount of data to write */
sock_t
handle; /* Socket handle */
byte
*buffer = NULL; /* Buffer to write */
qbyte
tag; /* User-defined request tag */
/* Get arguments from message */
exdr_read (thread-> event-> body, SMT_SOCK_WRITE,
&timeout, &handle, &write_size, &buffer, &tag);
/* For write requests we do not want to allow a zero timeout, it makes
no sense and on some bogus OSes (Solaris) it can cause socket leaks.
*/
if (timeout == 0)
timeout = WRITE_TIMEOUT;
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: WRITE size=%d socket=%ld timeout=%d data=%x %x %x %x",
write_size, handle, timeout,
(byte) buffer [0], (byte) buffer [1],
(byte) buffer [2], (byte) buffer [3]);
if (write_size == 0 || handle == 0)
{
mem_free (buffer);
reply_error (&thread-> event-> sender, handle,
"Null write request", tag);
}
else
if (write_size > UINT_MAX)
{
mem_free (buffer);
reply_error (&thread-> event-> sender, handle,
"Write request too large for memory model", tag);
}
else
if (buffer == NULL) /* Not enough memory */
raise_exception (exception_event);
else
if ((request = request_create (thread, timeout, handle, tag)) != NULL)
{
request-> output = handle;
request-> max_size = write_size;
request-> min_size = write_size;
request-> buffer = buffer;
}
}
/*********************** CREATE HUGE WRITE REQUEST ***********************/
MODULE create_huge_write_request (THREAD *thread)
{
dbyte
timeout; /* Timeout, in seconds, or zero */
qbyte
write_size, /* Amount of data to write */
tag; /* User-defined request tag */
sock_t
handle; /* Socket handle */
byte
*buffer = NULL; /* Buffer to write */
/* Get arguments from message */
exdr_read (thread-> event-> body, SMT_SOCK_WRITEH,
&timeout, &handle, &write_size, &buffer, &tag);
/* For write requests we do not want to allow a zero timeout, it makes
no sense and on some bogus OSes (Solaris) it can cause socket leaks.
*/
if (timeout == 0)
timeout = WRITE_TIMEOUT;
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: WRITEH size=%ld socket=%ld timeout=%d data=%x%x%x%x",
write_size, handle, timeout,
(byte) buffer [0], (byte) buffer [1],
(byte) buffer [2], (byte) buffer [3]);
if (write_size == 0 || handle == 0)
{
mem_free (buffer);
reply_error (&thread-> event-> sender, handle,
"Null write request", tag);
}
else
if (buffer == NULL) /* Not enough memory */
raise_exception (exception_event);
else
if ((request = request_create (thread, timeout, handle, tag)) != NULL)
{
request-> output = handle;
request-> max_size = write_size;
request-> min_size = write_size;
request-> buffer = buffer;
request-> huge_block = TRUE;
}
}
/************************** CREATE INPUT REQUEST *************************/
MODULE create_input_request (THREAD *thread)
{
dbyte
timeout; /* Timeout, in seconds, or zero */
sock_t
handle; /* Socket handle */
qbyte
tag; /* User-defined request tag */
/* Get arguments from message */
exdr_read (thread-> event-> body, SMT_SOCK_INPUT,
&timeout, &handle, &tag);
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: INPUT socket=%ld timeout=%d", handle, timeout);
if (handle == 0)
reply_error (&thread-> event-> sender, handle,
"Null input request", tag);
else
if ((request = request_create (thread, timeout, handle, tag)) != NULL)
{
purge_old_request (handle, 0);
request-> input = handle;
}
}
/********************** CREATE INPUT REPEAT REQUEST **********************/
MODULE create_input_repeat_request (THREAD *thread)
{
create_input_request (thread);
if (request)
request-> repeat = TRUE;
}
/************************* CREATE OUTPUT REQUEST *************************/
MODULE create_output_request (THREAD *thread)
{
dbyte
timeout; /* Timeout, in seconds, or zero */
sock_t
handle; /* Socket handle */
qbyte
tag; /* User-defined request tag */
/* Get arguments from message */
exdr_read (thread-> event-> body, SMT_SOCK_OUTPUT,
&timeout, &handle, &tag);
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: OUTPUT socket=%ld timeout=%d", handle, timeout);
if (handle == 0)
reply_error (&thread-> event-> sender, handle,
"Null output request", tag);
else
if ((request = request_create (thread, timeout, handle, tag)) != NULL)
{
purge_old_request (0, handle);
request-> output = handle;
}
}
/************************* CREATE CONNECT REQUEST ************************/
MODULE create_connect_request (THREAD *thread)
{
dbyte
timeout, /* Timeout, in seconds, or zero */
port_nbr; /* Literal port number */
char
*type = NULL, /* Type of connection to make */
*host = NULL, /* Host to connect to */
*service = NULL; /* Service or port number */
qbyte
host_nbr; /* Literal host number */
struct sockaddr_in
host_addr; /* Structure for connection */
sock_t
handle; /* Handle for connection */
qbyte
tag; /* User-defined request tag */
/* Get arguments from message */
exdr_read (thread-> event-> body, SMT_SOCK_CONNECT,
&timeout, &type, &host, &service, &port_nbr, &host_nbr, &tag);
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: CONNECT type=%s to=%s/%s nbr=%lx/%d timeout=%d",
type, host, service, host_nbr, port_nbr, timeout);
/* Build socket address structure and connect to host. Either of the */
/* information pairs (host, service) (host_nbr, port_nbr) will be used */
/* by the connect function. */
build_sockaddr (&host_addr, host_nbr, port_nbr);
handle = connect_socket (host, service, type, &host_addr, 3, 0);
/* The connect call can fail, in which case we return the socket error */
/* message. If the call succeeds, we need to wait until the socket is */
/* ready for use, since we use non-blocking sockets. We generate a */
/* write request; when this is true we'll send an ok event plus the */
/* socket handle to the calling program. */
if (handle == INVALID_SOCKET)
reply_error (&thread-> event-> sender, 0, (char *) sockmsg (), tag);
else
if (handle > 0 /* Else wait until ready to write */
&& (request = request_create (thread, timeout, handle, tag)) != NULL)
{
purge_old_request (0, handle);
request-> output = handle;
}
mem_free (type); /* Release allocated memory */
mem_free (host);
mem_free (service);
}
/************************* FLUSH SOCKET REQUESTS *************************/
MODULE flush_socket_requests (THREAD *thread)
{
sock_t
handle; /* Socket handle */
exdr_read (thread-> event-> body, SMT_SOCK_FLUSH, &handle);
purge_old_request (handle, handle);
}
/*********************** CHECK FOR EXPIRED REQUESTS **********************/
MODULE check_for_expired_requests (THREAD *thread)
{
time_t
time_now; /* Current time */
time_now = time (NULL);
for (request = requests.next;
request != (SOCKREQ *) &requests;
request = request-> next)
{
/* If the request timed-out, reply SOCK_TIMEOUT and delete it */
if (request-> expires && request-> expires < time_now)
{
send_sock_timeout (&request-> reply_to, request-> timeout,
request_handle (request),
(dbyte) request-> cur_size,
request-> buffer, request-> tag);
request = request-> prev; /* We want to continue in list */
request_destroy (request-> next);
}
else
/* If request socket is dead, reply SOCK_CLOSED and delete it */
if (!socket_is_alive (request_handle (request)))
{
send_sock_closed (&request-> reply_to, request-> timeout,
request_handle (request),
(dbyte) request-> cur_size,
request-> buffer, request-> tag);
request = request-> prev; /* We want to continue in list */
request_destroy (request-> next);
}
}
if (requests.next == &requests)
raise_exception (no_requests_event);
}
/* -------------------------------------------------------------------------
* request_handle
*
* Returns the request handle (input or output, but not both). Returns 0
* if neither is set; this cannot normally happen.
*/
static sock_t
request_handle (SOCKREQ *request)
{
if (request-> input)
return (request-> input);
else
if (request-> output)
return (request-> output);
else
return (0);
}
/************************ WAIT FOR SOCKET ACTIVITY ***********************/
MODULE wait_for_socket_activity (THREAD *thread)
{
struct timeval
#if (defined (__WINDOWS__)) /* Windows select() never unblocks */
timeout = { 0, 200000 }; /* Timeout for select() = 1/5s */
#else
timeout = { 1, 0 }; /* Timeout for select() = 1s */
#endif
sock_t
top_socket = 0; /* Highest socket number */
int
rc; /* Return code from select() */
smt_set_step ("prepare fd_sets");
memset (&read_set, 0, sizeof (fd_set));
memset (&write_set, 0, sizeof (fd_set));
memset (&error_set, 0, sizeof (fd_set));
for (request = requests.next;
request != (SOCKREQ *) &requests;
request = request-> next)
{
if (request-> input)
{
FD_SET ((int) request-> input, &read_set);
FD_SET ((int) request-> input, &error_set);
top_socket = max (request-> input, top_socket);
if (trace_flag && had_activity)
sendfmt (&operq, "INFO",
"smtsock: wait for input on %d", request-> input);
}
else
if (request-> output)
{
FD_SET ((int) request-> output, &write_set);
FD_SET ((int) request-> output, &error_set);
top_socket = max (request-> output, top_socket);
if (trace_flag && had_activity)
sendfmt (&operq, "INFO",
"smtsock: wait for output on %d", request-> output);
}
}
smt_set_step ("sock_select");
rc = sock_select (
(int) top_socket + 1, /* Handles to check */
&read_set, /* Check for input */
&write_set, /* Check for output */
&error_set, /* Check for errors */
&timeout); /* Timeout */
smt_set_step ("after select");
if (trace_flag && had_activity)
sendfmt (&operq, "INFO",
"smtsock: return code from select() = %d", rc);
/* If select failed, send error to console, and terminate */
if (rc == SOCKET_ERROR) /* Error from socket call */
{
if (sockerrno == EINTR) /* Ignore interrupted call */
raise_exception (no_activity_event);
else
if (sockerrno != EBADF) /* Ignore errors on handles */
{
sendfmt (&operq, "ERROR",
"smtsock: error on select(): %s", sockmsg ());
raise_exception (exception_event);
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: error from select() = %d %s",
sockerrno, sockmsg ());
}
}
else
if (rc == 0)
raise_exception (no_activity_event);
had_activity = FALSE;
}
/********************** CHECK FIRST SOCKET ACTIVITY **********************/
MODULE check_first_socket_activity (THREAD *thread)
{
request = (SOCKREQ *) requests.next;
check_next_socket_activity (thread);
}
/*********************** CHECK NEXT SOCKET ACTIVITY **********************/
MODULE check_next_socket_activity (THREAD *thread)
{
/* We want to write any output waiting before handling new input */
the_next_event = finished_event;
while (request != (SOCKREQ *) &requests)
{
if (FD_ISSET ((int) request-> output, &write_set))
{
had_activity = TRUE;
the_next_event = (request-> buffer? write_event: output_event);
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: -- output ready on %d", request-> output);
}
/* If any event got set, we can access it as active_request */
if (the_next_event != finished_event)
{
active_request = request;
request = request-> next;
return;
}
else
request = request-> next;
}
/* If finished checking output sockets, check input socket */
if (request == (SOCKREQ *) &requests)
request = (SOCKREQ *) requests.next;
while (request != (SOCKREQ *) &requests)
{
if (FD_ISSET ((int) request-> input, &read_set))
{
had_activity = TRUE;
the_next_event = (request-> buffer? read_event: input_event);
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: -- input ready on %d", request-> input);
}
/* If any event got set, we can access it as active_request */
if (the_next_event != finished_event)
{
active_request = request;
request = request-> next;
return;
}
else
request = request-> next;
}
}
/************************* READ DATA FROM SOCKET *************************/
MODULE read_data_from_socket (THREAD *thread)
{
/* Read as much data as we can from the request's socket, then */
/* update the request appropriately. */
if (trace_flag)
{
sendfmt (&operq, "INFO",
"smtsock: reading %d bytes from %ld",
active_request-> max_size - active_request-> cur_size,
active_request-> input);
}
handle_partial_io (
active_request,
read_TCP (active_request-> input,
active_request-> buffer + active_request-> cur_size,
(size_t) (active_request-> max_size - active_request-> cur_size))
);
}
/* -------------------------------------------------------------------------
* handle_partial_io
*
* Handles the return code from a socket read or write, to update the
* request size indicators and set the next event.
*/
static void
handle_partial_io (SOCKREQ *request, int bytes_done)
{
/* If we read something, update the request cur_size, and check if */
/* we got everything. If so, we can signal 'finished'. Else we loop. */
if (bytes_done > 0)
{
if (trace_flag)
{
byte *ptr = (byte *) request-> buffer + request-> cur_size;
sendfmt (&operq, "INFO",
"smtsock: %02x %02x %02x %02x %02x %02x %02x %02x...",
ptr [0], ptr [1], ptr [2], ptr [3],
ptr [4], ptr [5], ptr [6], ptr [7]);
}
request-> cur_size += bytes_done;
if (request-> cur_size >= request-> min_size)
the_next_event = finished_event;
else
the_next_event = incomplete_event;
}
/* If the return code was zero, the socket got closed. Whatever we */
/* got, we'll send back. Some systems return EPIPE or ECONNRESET. */
else
if (bytes_done == 0 || sockerrno == EPIPE || sockerrno == ECONNRESET)
the_next_event = closed_event;
else
/* In principle we can't get an EAGAIN, since we waited until the */
/* socket was ready, but you never know. We'll just try again... */
if (sockerrno == EAGAIN || sockerrno == EWOULDBLOCK)
the_next_event = incomplete_event;
/* Anything else, that's an error */
else
the_next_event = error_event;
}
/************************** WRITE DATA TO SOCKET *************************/
MODULE write_data_to_socket (THREAD *thread)
{
/* Write as much data as we can to the request's socket, then */
/* update the request appropriately. */
if (trace_flag)
sendfmt (&operq, "INFO",
"smtsock: writing %d bytes to %ld",
active_request-> max_size - active_request-> cur_size,
active_request-> output);
handle_partial_io (
active_request,
write_TCP (active_request-> output,
active_request-> buffer + active_request-> cur_size,
(size_t) (active_request-> max_size - active_request-> cur_size))
);
}
/********************* SIGNAL SOCKET READY FOR INPUT *********************/
MODULE signal_socket_ready_for_input (THREAD *thread)
{
reply_normal (active_request, "SOCK_INPUT_OK");
if (!active_request-> repeat)
request_destroy (active_request);
}
/* -------------------------------------------------------------------------
* reply_normal
*
* Formats and sends a message containing a socket number only. Used for
* all normal (non-read) replies - after a write or a connect. Destroys
* the specified request.
*/
static void
reply_normal (SOCKREQ *request, char *event_name)
{
msg_size = exdr_writed (&msg, SMT_SOCK_OK,
request_handle (request), request-> tag);
event_send (
&request-> reply_to, /* Send to specified queue */
NULL, /* No queue for reply */
event_name, /* Name of event to send */
msg_body, /* Event body contents */
msg_size, /* Event body size */
NULL, NULL, NULL, /* No response events */
0); /* No timeout */
}
/********************* SIGNAL SOCKET READY FOR OUTPUT ********************/
MODULE signal_socket_ready_for_output (THREAD *thread)
{
reply_normal (active_request, "SOCK_OUTPUT_OK");
request_destroy (active_request);
}
/************************** SIGNAL READ COMPLETE *************************/
MODULE signal_read_complete (THREAD *thread)
{
if (active_request-> huge_block)
send_sock_readh_ok (
&active_request-> reply_to,
active_request-> timeout,
request_handle (active_request),
active_request-> cur_size,
active_request-> buffer,
active_request-> tag);
else
send_sock_read_ok (
&active_request-> reply_to,
active_request-> timeout,
request_handle (active_request),
(dbyte) active_request-> cur_size,
active_request-> buffer,
active_request-> tag);
if (active_request-> repeat)
active_request-> cur_size = 0;
else
request_destroy (active_request);
}
/*************************** SIGNAL READ CLOSED **************************/
MODULE signal_read_closed (THREAD *thread)
{
/* We send back a short data block, if any data was read */
send_sock_closed (
&active_request-> reply_to,
active_request-> timeout,
request_handle (active_request),
(dbyte) active_request-> cur_size,
active_request-> buffer,
active_request-> tag);
request_destroy (active_request);
}
/************************* SIGNAL WRITE COMPLETE *************************/
MODULE signal_write_complete (THREAD *thread)
{
if (active_request-> huge_block)
send_sock_writeh_ok (
&active_request-> reply_to,
request_handle (active_request),
active_request-> tag);
else
send_sock_write_ok (
&active_request-> reply_to,
request_handle (active_request),
active_request-> tag);
request_destroy (active_request);
}
/************************** SIGNAL WRITE CLOSED **************************/
MODULE signal_write_closed (THREAD *thread)
{
reply_normal (active_request, "SOCK_CLOSED");
request_destroy (active_request);
}
/************************** SIGNAL SOCKET ERROR **************************/
MODULE signal_socket_error (THREAD *thread)
{
reply_error (&active_request-> reply_to, request_handle (active_request),
(char *) sockmsg (), active_request-> tag);
request_destroy (active_request);
}
/*********************** SEND TIMEOUT EVENT TO SELF **********************/
MODULE send_timeout_event_to_self (THREAD *thread)
{
event_send (
&thread-> queue-> qid, /* Send to specified queue */
NULL, /* No queue for reply */
"_TIMEOUT", /* Name of event to send */
NULL, 0, /* No event body */
NULL, NULL, NULL, /* No response events */
0); /* No timeout */
}
/************************** DESTROY ALL REQUESTS *************************/
MODULE destroy_all_requests (THREAD *thread)
{
while (requests.next != &requests)
request_destroy (requests.next);
}
/************************* TERMINATE THE THREAD **************************/
MODULE terminate_the_thread (THREAD *thread)
{
the_next_event = terminate_event;
}