/* ** Copyright (C) 2006-2007 by Carnegie Mellon University. ** ** @OPENSOURCE_HEADER_START@ ** ** Use of the SILK system and related source code is subject to the terms ** of the following licenses: ** ** GNU Public License (GPL) Rights pursuant to Version 2, June 1991 ** Government Purpose License Rights (GPLR) pursuant to DFARS 252.225-7013 ** ** NO WARRANTY ** ** ANY INFORMATION, MATERIALS, SERVICES, INTELLECTUAL PROPERTY OR OTHER ** PROPERTY OR RIGHTS GRANTED OR PROVIDED BY CARNEGIE MELLON UNIVERSITY ** PURSUANT TO THIS LICENSE (HEREINAFTER THE "DELIVERABLES") ARE ON AN ** "AS-IS" BASIS. CARNEGIE MELLON UNIVERSITY MAKES NO WARRANTIES OF ANY ** KIND, EITHER EXPRESS OR IMPLIED AS TO ANY MATTER INCLUDING, BUT NOT ** LIMITED TO, WARRANTY OF FITNESS FOR A PARTICULAR PURPOSE, ** MERCHANTABILITY, INFORMATIONAL CONTENT, NONINFRINGEMENT, OR ERROR-FREE ** OPERATION. CARNEGIE MELLON UNIVERSITY SHALL NOT BE LIABLE FOR INDIRECT, ** SPECIAL OR CONSEQUENTIAL DAMAGES, SUCH AS LOSS OF PROFITS OR INABILITY ** TO USE SAID INTELLECTUAL PROPERTY, UNDER THIS LICENSE, REGARDLESS OF ** WHETHER SUCH PARTY WAS AWARE OF THE POSSIBILITY OF SUCH DAMAGES. ** LICENSEE AGREES THAT IT WILL NOT MAKE ANY WARRANTY ON BEHALF OF ** CARNEGIE MELLON UNIVERSITY, EXPRESS OR IMPLIED, TO ANY PERSON ** CONCERNING THE APPLICATION OF OR THE RESULTS TO BE OBTAINED WITH THE ** DELIVERABLES UNDER THIS LICENSE. ** ** Licensee hereby agrees to defend, indemnify, and hold harmless Carnegie ** Mellon University, its trustees, officers, employees, and agents from ** all claims or demands made against them (and any related losses, ** expenses, or attorney's fees) arising out of, or relating to Licensee's ** and/or its sub licensees' negligent use or willful misuse of or ** negligent conduct or willful misconduct regarding the Software, ** facilities, or other rights or assistance granted by Carnegie Mellon ** University under this License, including, but not limited to, any ** claims of product liability, personal injury, death, damage to ** property, or violation of any laws or regulations. ** ** Carnegie Mellon University Software Engineering Institute authored ** documents are sponsored by the U.S. Department of Defense under ** Contract F19628-00-C-0003. Carnegie Mellon University retains ** copyrights in all material produced under this contract. The U.S. ** Government retains a non-exclusive, royalty-free license to publish or ** reproduce these documents, or allow others to do so, for U.S. ** Government purposes only pursuant to the copyright license under the ** contract clause at 252.227.7013. ** ** @OPENSOURCE_HEADER_END@ */ /* ** SiLK message functions ** */ #include "silk.h" RCSIDENT("$SiLK: skmsg.c 8360 2007-08-09 19:53:17Z mwd $"); #include "utils.h" #include "skdeque.h" #include "multiqueue.h" #include "intdict.h" #include "sklog.h" #include "skmsg.h" #ifdef HAVE_GNUTLS_GNUTLS_H #include #include #endif /* HAVE_GNUTLS_GNUTLS_H */ /* #define SKTHREAD_DEBUG_MUTEX 1 */ #include "skthread.h" /* Define Constants */ /* Read and write sides of control pipes */ #define READ 0 #define WRITE 1 #define SKMERR_MEMORY -1 #define SKMERR_PIPE -2 #define SKMERR_MUTEX -3 #define SKMERR_PTHREAD -4 #define SKMERR_ERROR -5 #define SKMERR_ERRNO -6 #define SKMERR_CLOSED -7 #define SKMERR_SHORT -8 #define LISTENQ 5 #define SKMSG_CTL_CHANNEL_ANNOUNCE 0xFFFE #define SKMSG_CTL_CHANNEL_REPLY 0xFFFD #define SKMSG_CTL_CHANNEL_KILL 0xFFFC #define SKMSG_WRITER_UNBLOCKER 0xFFFB #define SKMSG_MINIMUM_SYSTEM_CTL_CHANNEL 0xFFFC #define SKCTL_RECONF 0 /* Diffie-Hellman bits */ #define DH_BITS 1024 /* Define Macros */ /* Turns on DEBUG_PRINT macros if defined. */ /* #define DEBUGGING_OUTPUT 1 */ /* Turns on function entry/exit debug print macros if defined. */ /* #define DEBUG_FUNCTION_ENTRY_EXIT 1 */ #ifdef DEBUGGING_OUTPUT /* Macros to print debug messages, including line number and thread information. */ #define DEBUG_PRINT1(x) SKTHREAD_DEBUG_PRINT1(x) #define DEBUG_PRINT2(x, y) SKTHREAD_DEBUG_PRINT2(x, y) #define DEBUG_PRINT3(x, y, z) SKTHREAD_DEBUG_PRINT3(x, y, z) #define DEBUG_PRINT4(x, y, z, zz) SKTHREAD_DEBUG_PRINT4(x, y, z, zz) #else /* DEBUGGING_OUTPUT */ #define DEBUG_PRINT1(x) #define DEBUG_PRINT2(x, y) #define DEBUG_PRINT3(x, y, z) #define DEBUG_PRINT4(x, y, z, zz) #endif /* DEBUGGING_OUTPUT */ #ifdef DEBUG_FUNCTION_ENTRY_EXIT /* Should be at the beginning of functions */ #define DEBUG_ENTER_FUNC DEBUG_PRINT2("Entering %s", __func__) /* Should be used instead of return */ #define RETURN(x) \ do { \ DEBUG_PRINT2("Exiting %s", __func__); \ return x; \ } while (0) #else /* DEBUG_FUNCTION_ENTRY_EXIT */ #define DEBUG_ENTER_FUNC #define RETURN(x) return x #endif /* DEBUG_FUNCTION_ENTRY_EXIT */ #ifdef HAVE_GNUTLS_GNUTLS_H # define UNUSED_IF_NOTLS(x) x #else # define UNUSED_IF_NOTLS(x) UNUSED(x) #endif /* HAVE_GNUTLS_GNUTLS_H */ /* Mutex accessor */ #define QUEUE_MUTEX(q) (&(q)->root->mutex) /* Queue lock */ #define QUEUE_LOCK(q) MUTEX_LOCK(QUEUE_MUTEX(q)) /* Queue unlock */ #define QUEUE_UNLOCK(q) MUTEX_UNLOCK(QUEUE_MUTEX(q)) /* Queue wait */ #define QUEUE_WAIT(cond, q) MUTEX_WAIT(cond, QUEUE_MUTEX(q)) /* Queue locked assert */ #define ASSERT_QUEUE_LOCK(q) \ assert(pthread_mutex_trylock(QUEUE_MUTEX(q)) == EBUSY) /* Any place an XASSERT occurs should be replaced with better error handling before release. */ #define XASSERT(x) do { assert(x); if (!(x)) abort(); } while (0) #define MEM_ASSERT(x) XASSERT(x) /* Macros to deal with thread creation and destruction */ #define QUEUE_TINFO(q) ((q)->root->tinfo) #define THREAD_INFO_INIT(q) \ do { \ pthread_cond_init(&QUEUE_TINFO(q).cond, NULL); \ QUEUE_TINFO(q).count = 0; \ } while (0) #define THREAD_INFO_DESTROY(q) pthread_cond_destroy(&QUEUE_TINFO(q).cond) #define THREAD_START(name, rv, q, loc, fn, arg) \ do { \ DEBUG_PRINT1("THREAD_START"); \ ASSERT_QUEUE_LOCK(q); \ QUEUE_TINFO(q).count++; \ (rv) = skthread_create(name, (loc), (fn), (arg)); \ if ((rv) != 0) { \ QUEUE_TINFO(q).count--; \ } \ } while (0) #define THREAD_END(q) \ do { \ DEBUG_PRINT1("THREAD_END"); \ ASSERT_QUEUE_LOCK(q); \ assert(QUEUE_TINFO(q).count != 0); \ QUEUE_TINFO(q).count--; \ MUTEX_BROADCAST(&QUEUE_TINFO(q).cond); \ } while (0) #define THREAD_WAIT_END(q, state) \ do { \ DEBUG_PRINT1("WAITING FOR THREAD_END"); \ ASSERT_QUEUE_LOCK(q); \ while ((state) != SKM_THREAD_ENDED) { \ QUEUE_WAIT(&QUEUE_TINFO(q).cond, q); \ } \ DEBUG_PRINT1("FINISHED WAITING FOR THREAD_END"); \ } while (0) #define THREAD_WAIT_ALL_END(q) \ do { \ DEBUG_PRINT1("WAITING FOR ALL THREAD_END"); \ ASSERT_QUEUE_LOCK(q); \ while (QUEUE_TINFO(q).count != 0) { \ QUEUE_WAIT(&QUEUE_TINFO(q).cond, q); \ } \ DEBUG_PRINT1("FINISHED WAITING FOR ALL THREAD_END"); \ } while (0) /*** Local types ***/ /* The type of message headers */ typedef struct _sk_msg_hdr_t { skm_channel_t channel; skm_type_t type; skm_len_t size; } sk_msg_hdr_t; /* The type of messages */ struct _sk_msg_t { sk_msg_hdr_t hdr; void *msg; }; /* Forward declaration for sk_msg_conn_queue */ struct _sk_msg_conn_queue_t; /* Protocol specific transport functions */ typedef struct _sk_msg_transport_fn_t { int (*send) (struct _sk_msg_conn_queue_t *conn, sk_msg_t *msg); int (*recv) (struct _sk_msg_conn_queue_t *conn, sk_msg_t **msg); } sk_msg_transport_fn_t; /* Allow tracking of thread entance and exits. */ typedef struct _sk_thread_info_t { pthread_cond_t cond; uint32_t count; } sk_thread_info_t; typedef enum { SKM_THREAD_BEFORE, SKM_THREAD_RUNNING, SKM_THREAD_SHUTTING_DOWN, SKM_THREAD_ENDED } sk_thread_state_t; /* The type of a message queue root */ typedef struct _sk_msg_root_t { pthread_mutex_t mutex; /* Global mutex for message queue */ skm_channel_t next_channel; /* The next channel number to try to allocate */ sk_thread_info_t tinfo; /* Information about active threads */ int_dict_t *channel; /* Map of channel-id to channels */ int_dict_t *connection; /* Map of read socket to connections */ int_dict_t *groups; /* Map of channel-ids to queues */ int reader_control[2]; /* Control pipes for the reader thread */ pthread_t reader; /* The reader thread */ sk_thread_state_t reader_state; /* The reader state */ pthread_cond_t reader_cond; /* Reader condition variable */ /* The information for binding and listening for connections */ struct sockaddr_in bind_addr; int bind_type; int listen_sock; sk_msg_queue_t *shutdownqueue; #ifdef HAVE_GNUTLS_GNUTLS_H gnutls_certificate_credentials_t cred; /* Auth/Encryption credentials */ #endif unsigned shuttingdown: 1; #ifdef HAVE_GNUTLS_GNUTLS_H unsigned cred_set: 1; unsigned bind_tls: 1; #endif } sk_msg_root_t; /* The type of a message queue */ struct _sk_msg_queue_t { sk_msg_root_t *root; int_dict_t *channel; /* Map of channels */ mq_multi_t *group; /* Queue group for all channels */ pthread_cond_t shutdowncond; unsigned shuttingdown: 1; }; struct _sk_msg_channel_queue_t; typedef struct _sk_msg_channel_queue_t sk_msg_channel_queue_t; /* States for connections and channels */ typedef enum { SKM_CREATED, SKM_CONNECTING, SKM_CONNECTED, SKM_CLOSED } sk_msg_state_t; /* States for threads */ typedef enum { SKM_SEND_INTERNAL, SKM_SEND_REMOTE, SKM_SEND_CONTROL } sk_send_type_t; /* TLS connections */ typedef enum { SKM_TLS_NONE, SKM_TLS_CLIENT, SKM_TLS_SERVER } skm_tls_type_t; /* Represents a connected socket or pipe */ typedef struct _sk_msg_conn_queue_t { int rsocket; /* Read socket */ int wsocket; /* Write socket */ sk_sockaddr_t *addr; /* Address of connection */ sk_msg_transport_fn_t fn; /* send and receive functions */ int_dict_t *channelmap; /* Channel map */ uint16_t refcount; /* channel refcount */ sk_msg_state_t state; /* Current state of connection */ skDeque_t queue; /* outgoing write queue */ pthread_t writer; /* Writer thread handle */ sk_thread_state_t writer_state; /* State */ pthread_cond_t writer_cond; /* Condition variable for thread */ int writer_control[2]; /* Writer thread control pipe */ #ifdef HAVE_GNUTLS_GNUTLS_H gnutls_session_t session; unsigned use_tls; #endif } sk_msg_conn_queue_t; /* Represents a channel */ struct _sk_msg_channel_queue_t { mq_queue_t *queue; /* Channel's queue */ skm_channel_t channel; /* local channel ID */ skm_channel_t rchannel; /* remote channel ID */ sk_msg_state_t state; /* channel state */ sk_msg_conn_queue_t *conn; /* associated connection */ sk_msg_queue_t *group; /* group associated with this channel */ pthread_cond_t pending; /* pending condition variable */ unsigned is_pending: 1; /* Whether we are waiting for connection */ unsigned flushing: 1; }; /* Used for passing data to a writer thread */ typedef struct _sk_queue_and_conn_t { sk_msg_queue_t *q; sk_msg_conn_queue_t *conn; } sk_queue_and_conn_t; /* Used to represent a local/remote channel pair */ typedef struct _sk_channel_pair_t { skm_channel_t lchannel; skm_channel_t rchannel; } sk_channel_pair_t; /*** Local function prototypes ***/ static int tcp_send(sk_msg_conn_queue_t *conn, sk_msg_t *msg); static int tcp_recv(sk_msg_conn_queue_t *conn, sk_msg_t **msg); #ifdef HAVE_GNUTLS_GNUTLS_H static int tls_send(sk_msg_conn_queue_t *conn, sk_msg_t *msg); static int tls_recv(sk_msg_conn_queue_t *conn, sk_msg_t **msg); #endif /* HAVE_GNUTLS_GNUTLS_H */ static void *reader_thread(void *); static void *writer_thread(void *); static void destroy_connection( sk_msg_queue_t *q, sk_msg_conn_queue_t *conn, int block); static int send_message( sk_msg_queue_t *q, skm_channel_t lchannel, skm_type_t type, void *message, skm_len_t length, sk_send_type_t send_type); /*** Local variables ***/ static sk_msg_transport_fn_t tcp_transport_fns = { tcp_send, tcp_recv }; #ifdef HAVE_GNUTLS_GNUTLS_H static sk_msg_transport_fn_t tls_transport_fns = { tls_send, tls_recv }; pthread_mutex_t sk_msg_gnutls_mutex = PTHREAD_MUTEX_INITIALIZER; static int sk_msg_gnutls_initialized = 0; static gnutls_dh_params_t dh_params; #endif /* HAVE_GNUTLS_GNUTLS_H */ /* Utility functions */ static sk_msg_channel_queue_t *find_channel( sk_msg_queue_t *q, skm_channel_t channel) { sk_msg_channel_queue_t **chan; DEBUG_ENTER_FUNC; ASSERT_QUEUE_LOCK(q); chan = (sk_msg_channel_queue_t **)int_dict_get(q->root->channel, channel, NULL); RETURN(chan ? *chan : NULL); } static sk_msg_conn_queue_t *find_connection( sk_msg_queue_t *q, int socket) { sk_msg_conn_queue_t **conn; DEBUG_ENTER_FUNC; ASSERT_QUEUE_LOCK(q); conn = (sk_msg_conn_queue_t **)int_dict_get(q->root->connection, socket, NULL); RETURN(conn ? *conn : NULL); } #if 0 static void set_nonblock(int fd) { int flags, rv; DEBUG_ENTER_FUNC; flags = fcntl(fd, F_GETFL, 0); XASSERT(flags != -1); flags |= O_NONBLOCK; rv = fcntl(fd, F_SETFL, flags); XASSERT(rv != -1); RETURN(); } #endif /* 0 */ static void sk_destroy_report_message(void *vmsg) { sk_msg_t *msg = (sk_msg_t *)vmsg; DEBUG_ENTER_FUNC; DEBUG_PRINT3("Queue (destroy): chan=%#x type=%#x", msg->hdr.channel, msg->hdr.type); skMsgDestroy(msg); RETURN(); } /*** TCP functions ***/ static int tcp_send( sk_msg_conn_queue_t *conn, sk_msg_t *msg) { ssize_t rv; struct iovec iov[2]; DEBUG_ENTER_FUNC; assert(msg); assert(conn); /* Set up the iovec */ iov[0].iov_base = &msg->hdr; iov[0].iov_len = sizeof(msg->hdr); iov[1].iov_base = msg->msg; iov[1].iov_len = msg->hdr.size; /* Convert data to network byte order */ DEBUG_PRINT3("Sending chan = %#x type = %#x", msg->hdr.channel, msg->hdr.type); msg->hdr.channel = htons(msg->hdr.channel); msg->hdr.type = htons(msg->hdr.type); msg->hdr.size = htons(msg->hdr.size); retry: /* Write the message */ rv = writev(conn->wsocket, iov, 2); if (rv == -1) { int err; if (errno == EINTR) { goto retry; } if (errno == EPIPE || errno == ECONNRESET) { RETURN(SKMERR_CLOSED); } err = errno; RETURN(SKMERR_ERRNO); } else if (rv == 0) { RETURN(SKMERR_CLOSED); } else if (rv != (ssize_t)(iov[0].iov_len + iov[1].iov_len)) { RETURN(SKMERR_SHORT); } RETURN(0); } static int tcp_recv( sk_msg_conn_queue_t *conn, sk_msg_t **message) { ssize_t rv; sk_msg_hdr_t *hdr; sk_msg_t *msg; int retval = 0; DEBUG_ENTER_FUNC; assert(message); assert(conn); /* Create a message structure */ msg = malloc(sizeof(*msg)); MEM_ASSERT(msg != NULL); msg->msg = NULL; hdr = &msg->hdr; retry1: /* Read a header */ rv = read(conn->rsocket, hdr, sizeof(*hdr)); if (rv == -1) { if (errno == EINTR) { goto retry1; } retval = SKMERR_ERRNO; goto end; } else if (rv == 0) { retval = SKMERR_CLOSED; goto end; } else if (rv != sizeof(*hdr)) { retval = SKMERR_SHORT; goto end; } /* Convert network byte order to host byte order */ hdr->channel = ntohs(hdr->channel); hdr->type = ntohs(hdr->type); hdr->size = ntohs(hdr->size); DEBUG_PRINT3("Receiving chan = %#x type = %#x", hdr->channel, hdr->type); if (hdr->size) { skm_len_t size = hdr->size; uint8_t *block; /* Allocate space for the body of the message */ msg->msg = malloc(size); MEM_ASSERT(msg->msg); block = msg->msg; while (size) { /* Read in the body of the message */ rv = read(conn->rsocket, block, size); if (rv == -1) { if (errno == EINTR) { continue; } DEBUG_PRINT2("read failure: [%s]", strerror(errno)); retval = SKMERR_ERRNO; break; } else if (rv == 0) { DEBUG_PRINT1("read ended: [EOF]"); retval = SKMERR_CLOSED; break; } size -= rv; block += rv; } } else { msg->msg = NULL; } end: if (retval == 0) { *message = msg; } else { if (msg->msg) { free(msg->msg); } free(msg); } RETURN(retval); } #ifdef HAVE_GNUTLS_GNUTLS_H /*** TLS functions ***/ static int tls_send( sk_msg_conn_queue_t *conn, sk_msg_t *msg) { ssize_t rv; skm_len_t size; DEBUG_ENTER_FUNC; assert(msg); assert(conn); assert(conn->use_tls); size = msg->hdr.size; /* Convert data to network byte order */ DEBUG_PRINT3("Sending chan = %#x type = %#x", msg->hdr.channel, msg->hdr.type); msg->hdr.channel = htons(msg->hdr.channel); msg->hdr.type = htons(msg->hdr.type); msg->hdr.size = htons(msg->hdr.size); retry1: /* Write the message */ DEBUG_PRINT2("calling gnutls_record_send (%d)", sizeof(msg->hdr)); rv = gnutls_record_send(conn->session, &msg->hdr, sizeof(msg->hdr)); DEBUG_PRINT2("gnutls_record_send -> %d", rv); if (rv < 0) { if (rv == GNUTLS_E_INTERRUPTED || rv == GNUTLS_E_AGAIN) { goto retry1; } if (rv == GNUTLS_E_EXPIRED || rv == GNUTLS_E_INTERRUPTED || (rv == GNUTLS_E_PUSH_ERROR && (errno == EPIPE || errno == ECONNRESET))) { RETURN(SKMERR_CLOSED); } RETURN(SKMERR_ERRNO); } else if (rv == 0) { RETURN(SKMERR_CLOSED); } else if (rv != sizeof(msg->hdr)) { RETURN(SKMERR_SHORT); } if (size != 0) { retry2: DEBUG_PRINT2("calling gnutls_record_send (%d)", size); rv = gnutls_record_send(conn->session, msg->msg, size); DEBUG_PRINT2("gnutls_record_send -> %d", rv); if (rv < 0) { if (rv == GNUTLS_E_INTERRUPTED || rv == GNUTLS_E_AGAIN) { goto retry2; } if (rv == GNUTLS_E_EXPIRED || rv == GNUTLS_E_INTERRUPTED || (rv == GNUTLS_E_PUSH_ERROR && (errno == EPIPE || errno == ECONNRESET))) { RETURN(SKMERR_CLOSED); } RETURN(SKMERR_ERRNO); } else if (rv == 0) { RETURN(SKMERR_CLOSED); } else if (rv != size) { RETURN(SKMERR_SHORT); } } RETURN(0); } static int tls_recv( sk_msg_conn_queue_t *conn, sk_msg_t **message) { ssize_t rv; sk_msg_hdr_t *hdr; sk_msg_t *msg; int retval = 0; DEBUG_ENTER_FUNC; assert(message); /* Create a message structure */ msg = malloc(sizeof(*msg)); MEM_ASSERT(msg != NULL); msg->msg = NULL; hdr = &msg->hdr; retry: /* Read a header */ DEBUG_PRINT2("calling gnutls_record_recv (%d)", sizeof(*hdr)); rv = gnutls_record_recv(conn->session, hdr, sizeof(*hdr)); DEBUG_PRINT2("gnutls_record_recv -> %d", rv); if (rv < 0) { if (rv == GNUTLS_E_INTERRUPTED || rv == GNUTLS_E_AGAIN) { goto retry; } RETURN(SKMERR_ERRNO); goto end; } else if (rv == 0) { retval = SKMERR_CLOSED; goto end; } else if (rv != sizeof(*hdr)) { retval = SKMERR_SHORT; goto end; } /* Convert network byte order to host byte order */ hdr->channel = ntohs(hdr->channel); hdr->type = ntohs(hdr->type); hdr->size = ntohs(hdr->size); DEBUG_PRINT3("Receiving chan = %#x type = %#x", hdr->channel, hdr->type); if (hdr->size) { skm_len_t size = hdr->size; uint8_t *block; /* Allocate space for the body of the message */ msg->msg = malloc(size); MEM_ASSERT(msg->msg); block = msg->msg; while (size) { /* Read in the body of the message */ DEBUG_PRINT2("calling gnutls_record_recv (%d)", size); rv = gnutls_record_recv(conn->session, block, size); DEBUG_PRINT2("gnutls_record_recv -> %d", rv); if (rv == -1) { if (rv == GNUTLS_E_INTERRUPTED || rv == GNUTLS_E_AGAIN) { continue; } DEBUG_PRINT2("read failure: [%s]", gnutls_strerror(rv)); retval = SKMERR_ERRNO; break; } else if (rv == 0) { DEBUG_PRINT1("read ended: [EOF]"); retval = SKMERR_CLOSED; break; } size -= rv; block += rv; } } else { msg->msg = NULL; } end: if (retval == 0) { *message = msg; } else { if (msg->msg) { free(msg->msg); } free(msg); } RETURN(retval); } #endif /* HAVE_GNUTLS_GNUTLS_H */ /***********************************************************************/ #ifdef HAVE_GNUTLS_GNUTLS_H GCRY_THREAD_OPTION_PTHREAD_IMPL; static int skMsgGnuTLSInit(void) { DEBUG_ENTER_FUNC; if (!sk_msg_gnutls_initialized) { int rv; rv = gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread); if (rv == 0) { rv = gnutls_global_init(); } if (rv >= 0) { rv = gnutls_dh_params_init(&dh_params); } if (rv >= 0) { rv = gnutls_dh_params_generate2(dh_params, DH_BITS); } if (rv >= 0) { sk_msg_gnutls_initialized = 1; } RETURN(rv); } RETURN(0); } int skMsgQueueAddCA( sk_msg_queue_t *queue, const char *cred_filename) { int rv; DEBUG_ENTER_FUNC; assert(queue); assert(cred_filename); pthread_mutex_lock(&sk_msg_gnutls_mutex); if (!sk_msg_gnutls_initialized) { rv = skMsgGnuTLSInit(); if (rv != 0) { RETURN(-1); } } if (!queue->root->cred_set) { rv = gnutls_certificate_allocate_credentials(&queue->root->cred); if (rv >= 0) { gnutls_certificate_set_dh_params(queue->root->cred, dh_params); } } else { rv = 0; } if (rv >= 0) { rv = gnutls_certificate_set_x509_trust_file(queue->root->cred, cred_filename, GNUTLS_X509_FMT_PEM); if (rv < 0) { INFOMSG("Error loading x509 CA trust file %s: %s", cred_filename, gnutls_strerror(rv)); gnutls_certificate_free_credentials(queue->root->cred); } } if (rv >= 0) { if (!queue->root->cred_set) { queue->root->cred_set = 1; sk_msg_gnutls_initialized++; } pthread_mutex_unlock(&sk_msg_gnutls_mutex); RETURN(0); } pthread_mutex_unlock(&sk_msg_gnutls_mutex); RETURN(1); } int skMsgQueueAddCert( sk_msg_queue_t *queue, const char *cert_filename, const char *key_filename) { int rv; assert(queue); assert(cert_filename); assert(key_filename); DEBUG_ENTER_FUNC; pthread_mutex_lock(&sk_msg_gnutls_mutex); if (!sk_msg_gnutls_initialized) { rv = skMsgGnuTLSInit(); if (rv != 0) { RETURN(-1); } } if (!queue->root->cred_set) { rv = gnutls_certificate_allocate_credentials(&queue->root->cred); if (rv >= 0) { gnutls_certificate_set_dh_params(queue->root->cred, dh_params); } } else { rv = 0; } if (rv >= 0) { rv = gnutls_certificate_set_x509_key_file(queue->root->cred, cert_filename, key_filename, GNUTLS_X509_FMT_PEM); if (rv < 0) { INFOMSG("Error loading x509 cert or key file %s, %s: %s", cert_filename, key_filename, gnutls_strerror(rv)); gnutls_certificate_free_credentials(queue->root->cred); } } if (rv >= 0) { if (!queue->root->cred_set) { queue->root->cred_set = 1; sk_msg_gnutls_initialized++; } pthread_mutex_unlock(&sk_msg_gnutls_mutex); RETURN(0); } pthread_mutex_unlock(&sk_msg_gnutls_mutex); RETURN(-1); } int skMsgQueueAddPKCS12( sk_msg_queue_t *queue, const char *cert_filename, const char *password) { int rv; assert(queue); assert(cert_filename); DEBUG_ENTER_FUNC; pthread_mutex_lock(&sk_msg_gnutls_mutex); if (!sk_msg_gnutls_initialized) { rv = skMsgGnuTLSInit(); if (rv != 0) { RETURN(-1); } } if (!queue->root->cred_set) { rv = gnutls_certificate_allocate_credentials(&queue->root->cred); if (rv >= 0) { gnutls_certificate_set_dh_params(queue->root->cred, dh_params); } } else { rv = 0; } if (rv >= 0) { rv = gnutls_certificate_set_x509_simple_pkcs12_file( queue->root->cred, cert_filename, GNUTLS_X509_FMT_DER, password); if (rv < 0) { INFOMSG("Error loading PKCS12 file %s: %s", cert_filename, gnutls_strerror(rv)); gnutls_certificate_free_credentials(queue->root->cred); } } if (rv >= 0) { if (!queue->root->cred_set) { queue->root->cred_set = 1; sk_msg_gnutls_initialized++; } pthread_mutex_unlock(&sk_msg_gnutls_mutex); RETURN(0); } pthread_mutex_unlock(&sk_msg_gnutls_mutex); RETURN(-1); } #endif /* HAVE_GNUTLS_GNUTLS_H */ /* Create a channel within a message queue. */ static sk_msg_channel_queue_t *create_channel( sk_msg_queue_t *q) { sk_msg_channel_queue_t *chan; int rv; DEBUG_ENTER_FUNC; assert(q); ASSERT_QUEUE_LOCK(q); /* Allocate space for a new channel */ chan = (sk_msg_channel_queue_t *)calloc(1, sizeof(*chan)); MEM_ASSERT(chan != NULL); chan->queue = mqCreateQueue(q->group); MEM_ASSERT(chan->queue != NULL); /* Assign a local channel number and add the channel to the message queue */ do { chan->channel = q->root->next_channel++; rv = int_dict_set(q->root->channel, chan->channel, &chan); } while (rv == 1); MEM_ASSERT(rv == 0); /* Channel is created, rchannel is unset */ chan->state = SKM_CREATED; chan->rchannel = SKMSG_CHANNEL_CONTROL; rv = pthread_cond_init(&chan->pending, NULL); XASSERT(rv == 0); chan->is_pending = 0; rv = int_dict_set(q->root->groups, chan->channel, &q); MEM_ASSERT(rv == 0); rv = int_dict_set(q->channel, chan->channel, &chan); MEM_ASSERT(rv == 0); chan->group = q; DEBUG_PRINT2("create_channel() == %d", chan->channel); RETURN(chan); } /* Attach a channel to a connection object. */ static int set_channel_connecting( sk_msg_queue_t UNUSED(*q), sk_msg_channel_queue_t *chan, sk_msg_conn_queue_t *conn) { int rv; DEBUG_ENTER_FUNC; assert(q); assert(chan); assert(conn); ASSERT_QUEUE_LOCK(q); assert(chan->state == SKM_CREATED); assert(conn->state != SKM_CLOSED); DEBUG_PRINT2("set_channel_connecting(%d)", chan->channel); /* Set the channel's communication stream, set it to half-connected, and up the refcount on the connection object. */ chan->conn = conn; chan->state = SKM_CONNECTING; /* Add an entry in the connections's channel map for the channel. */ rv = int_dict_set(conn->channelmap, chan->channel, &chan); MEM_ASSERT(rv != -1); assert(rv == 0); conn->state = SKM_CONNECTED; conn->refcount++; RETURN(0); } static void set_channel_closed( sk_msg_queue_t *q, sk_msg_channel_queue_t *chan, int no_destroy, int block) { int rv; sk_msg_conn_queue_t *conn; DEBUG_ENTER_FUNC; assert(q); assert(chan); ASSERT_QUEUE_LOCK(q); assert(chan->state == SKM_CONNECTING || chan->state == SKM_CONNECTED); assert(chan->conn); assert(chan->conn->refcount > 0); DEBUG_PRINT2("set_channel_closed(%d)", chan->channel); conn = chan->conn; if (chan->state == SKM_CONNECTED && chan->channel != SKMSG_CHANNEL_CONTROL) { skm_channel_t lchannel = htons(chan->channel); DEBUG_PRINT1("Sending SKMSG_CTL_CHANNEL_DIED (Internal)"); rv = send_message(q, SKMSG_CHANNEL_CONTROL, SKMSG_CTL_CHANNEL_DIED, &lchannel, sizeof(lchannel), SKM_SEND_INTERNAL); XASSERT(rv == 0); } rv = int_dict_del(conn->channelmap, chan->channel); assert(rv == 0); chan->state = SKM_CLOSED; conn->refcount--; /* Notify people waiting on this channel to complete connecting that it is dead. */ MUTEX_BROADCAST(&chan->pending); if (conn->refcount == 0 && !no_destroy) { destroy_connection(q, conn, block); } RETURN(); } static int set_channel_connected( sk_msg_queue_t UNUSED(*q), sk_msg_channel_queue_t *chan, skm_channel_t rchannel) { DEBUG_ENTER_FUNC; assert(q); assert(chan); ASSERT_QUEUE_LOCK(q); assert(chan->state == SKM_CONNECTING); DEBUG_PRINT2("set_channel_connected(%d)", chan->channel); chan->rchannel = rchannel; chan->state = SKM_CONNECTED; RETURN(0); } static void destroy_channel( sk_msg_queue_t *q, sk_msg_channel_queue_t *chan, int block) { int rv; DEBUG_ENTER_FUNC; assert(q); assert(chan); ASSERT_QUEUE_LOCK(q); DEBUG_PRINT2("destroy_channel(%d)", chan->channel); if (chan->state == SKM_CONNECTED && chan->channel != SKMSG_CHANNEL_CONTROL) { skm_channel_t rchannel = htons(chan->rchannel); DEBUG_PRINT1("Sending SKMSG_CTL_CHANNEL_KILL (Ext-control)"); rv = send_message(q, chan->channel, SKMSG_CTL_CHANNEL_KILL, &rchannel, sizeof(rchannel), SKM_SEND_CONTROL); XASSERT(rv == 0); } if (chan->state != SKM_CLOSED) { set_channel_closed(q, chan, 0, block); } assert(chan->state == SKM_CLOSED); rv = int_dict_del(q->root->channel, chan->channel); assert(rv == 0); rv = int_dict_del(q->root->groups, chan->channel); assert(rv == 0); rv = int_dict_del(chan->group->channel, chan->channel); assert(rv == 0); rv = pthread_cond_destroy(&chan->pending); assert(rv == 0); /* Disable adding to the queue (it will be destroyed when the group is destroyed) */ mqQueueDisable(chan->queue, MQ_ADD); free(chan); RETURN(); } #ifdef HAVE_GNUTLS_GNUTLS_H static int setup_tls( sk_msg_queue_t *q, sk_msg_conn_queue_t *conn, int rsocket, int wsocket, skm_tls_type_t tls) { int rv; unsigned int status; DEBUG_ENTER_FUNC; assert(q); assert(conn); assert(q->root->cred_set); switch (tls) { case SKM_TLS_CLIENT: rv = gnutls_init(&conn->session, GNUTLS_CLIENT); break; case SKM_TLS_SERVER: rv = gnutls_init(&conn->session, GNUTLS_SERVER); break; default: assert(0); abort(); } if (rv < 0) { INFOMSG("Failed TLS init: %s", gnutls_strerror(rv)); RETURN(-1); } rv = gnutls_set_default_priority(conn->session); XASSERT(rv >= 0); rv = gnutls_credentials_set(conn->session, GNUTLS_CRD_CERTIFICATE, q->root->cred); XASSERT(rv >= 0); gnutls_transport_set_ptr2 (conn->session, (gnutls_transport_ptr_t)rsocket, (gnutls_transport_ptr_t)wsocket); if (tls == SKM_TLS_SERVER) { gnutls_certificate_server_set_request(conn->session, GNUTLS_CERT_REQUIRE); } again1: DEBUG_PRINT1("Attempting TLS handshake"); rv = gnutls_handshake(conn->session); if (rv < 0) { if (rv == GNUTLS_E_AGAIN || rv == GNUTLS_E_INTERRUPTED) { goto again1; } if (rv == GNUTLS_E_PUSH_ERROR) { INFOMSG("Remote side disconnected during TLS handshake."); } else { INFOMSG("TLS handshake failed: %s", gnutls_strerror(rv)); } gnutls_deinit(conn->session); RETURN(-1); } DEBUG_PRINT1("TLS handshake succeeded"); rv = gnutls_certificate_verify_peers2(conn->session, &status); if (rv < 0) { INFOMSG("Certificate verification failed: %s", gnutls_strerror(rv)); } if (status != 0) { again2: rv = gnutls_bye(conn->session, GNUTLS_SHUT_RDWR); if (rv == GNUTLS_E_AGAIN || rv == GNUTLS_E_INTERRUPTED) { goto again2; } gnutls_deinit(conn->session); RETURN(-1); } conn->use_tls = 1; RETURN(0); } #endif /* HAVE_GNUTLS_GNUTLS_H */ static int create_connection( sk_msg_queue_t *q, int rsocket, int wsocket, sk_sockaddr_t *addr, sk_msg_conn_queue_t **rconn, skm_tls_type_t UNUSED_IF_NOTLS(tls)) { static uint8_t skctl_reconf = SKCTL_RECONF; sk_msg_conn_queue_t *conn; sk_queue_and_conn_t *qac; int rv; DEBUG_ENTER_FUNC; assert(q); assert(rconn); ASSERT_QUEUE_LOCK(q); DEBUG_PRINT3("create_connection() = %d, %d", rsocket, wsocket); /* Allocate space for the connection */ conn = (sk_msg_conn_queue_t *)calloc(1, sizeof(*conn)); MEM_ASSERT(conn != NULL); #ifdef HAVE_GNUTLS_GNUTLS_H if (tls != SKM_TLS_NONE) { rv = setup_tls(q, conn, rsocket, wsocket, tls); if (rv != 0) { free(conn); RETURN(-1); } } #endif /* HAVE_GNUTLS_GNUTLS_H */ /* Set the read and write sockets */ conn->rsocket = rsocket; conn->wsocket = wsocket; /* And the address */ conn->addr = addr; /* Set the transport functions */ #ifdef HAVE_GNUTLS_GNUTLS_H if (tls == SKM_TLS_NONE) { conn->fn = tcp_transport_fns; } else { conn->fn = tls_transport_fns; } #else conn->fn = tcp_transport_fns; #endif /* HAVE_GNUTLS_GNUTLS_H */ /* Set up the channel queue and refcount */ conn->channelmap = int_dict_create(sizeof(sk_msg_channel_queue_t *)); MEM_ASSERT(conn->channelmap != NULL); conn->refcount = 0; /* Set the connections initial state */ conn->state = SKM_CREATED; /* Set up the write queue */ conn->queue = skDequeCreate(); XASSERT(conn->queue != NULL); /* Set up the writer control pipe */ rv = pipe(conn->writer_control); XASSERT(rv == 0); /* Initialize the thread start state */ pthread_cond_init(&conn->writer_cond, NULL); conn->writer_state = SKM_THREAD_BEFORE; /* Set up and start the writer thread */ qac = (sk_queue_and_conn_t *)malloc(sizeof(*qac)); MEM_ASSERT(qac != NULL); qac->q = q; qac->conn = conn; THREAD_START("skmsg_writer", rv, q, &conn->writer, writer_thread, qac); XASSERT(rv == 0); /* Wait for the thread to begin. */ while (conn->writer_state == SKM_THREAD_BEFORE) { QUEUE_WAIT(&conn->writer_cond, q); } assert(conn->writer_state == SKM_THREAD_RUNNING); /* Add the connection to the message queue */ rv = int_dict_set(q->root->connection, conn->rsocket, &conn); XASSERT(rv == 0); /* Tell the reader thread that there is a new connection */ DEBUG_PRINT1("SIGNAL reader reconf"); rv = write(q->root->reader_control[WRITE], &skctl_reconf, 1); XASSERT(rv == 1); *rconn = conn; RETURN(0); } static void destroy_connection( sk_msg_queue_t *q, sk_msg_conn_queue_t *conn, int block) { static uint8_t skctl_reconf = SKCTL_RECONF; sk_msg_channel_queue_t *chan; void *cont; int rv; sk_msg_t *msg; skDQErr_t err; DEBUG_ENTER_FUNC; assert(q); assert(conn); ASSERT_QUEUE_LOCK(q); DEBUG_PRINT3("destroy_connection() = %d, %d", conn->rsocket, conn->wsocket); /* Check to see if this connection is already being shut down */ if (conn->state == SKM_CLOSED) { return; } /* Okay, start closing */ conn->state = SKM_CLOSED; conn->writer_state = SKM_THREAD_SHUTTING_DOWN; if (block) { static sk_msg_t unblocker = {{SKMSG_CHANNEL_CONTROL, SKMSG_WRITER_UNBLOCKER, 0}, NULL}; /* Add a special messaage to the writers queue to guarantee it will unblock */ err = skDequePushBack(conn->queue, &unblocker); XASSERT(err == SKDQ_SUCCESS); /* Wait for the writer thread to end */ THREAD_WAIT_END(q, conn->writer_state); pthread_join(conn->writer, NULL); } /* Remove the connection from the connection queue */ rv = int_dict_del(q->root->connection, conn->rsocket); assert(rv == 0); if (!block) { /* Empty the queue */ while ((err = skDequePopBackNB(conn->queue, (void *)&msg)) == SKDQ_SUCCESS) { skMsgDestroy(msg); } assert(err == SKDQ_EMPTY); /* Shut down the queue */ err = skDequeUnblock(conn->queue); assert(err == SKDQ_SUCCESS); } /* Mark all channels using this connection as closed. */ cont = int_dict_get_first(conn->channelmap, &chan); while (cont != NULL) { intkey_t channel = chan->channel; if ((chan->state == SKM_CONNECTING || chan->state == SKM_CONNECTED)) { set_channel_closed(q, chan, 1, 0); } cont = int_dict_get_next(conn->channelmap, channel, &chan); } assert(conn->refcount == 0); /* Tell the reader thread that the connections have changed */ DEBUG_PRINT1("SIGNAL reader reconf"); rv = write(q->root->reader_control[WRITE], &skctl_reconf, 1); XASSERT(rv == 1); /* Destroy the channelmap */ int_dict_destroy(conn->channelmap); if (!block) { retry2: /* Shut down the writer thread */ DEBUG_PRINT1("SIGNAL writer exit"); rv = write(conn->writer_control[WRITE], &skctl_reconf, 1); if (rv == -1 && errno == EINTR) { goto retry2; } assert(rv == 1); /* Wait for the writer thread to end */ THREAD_WAIT_END(q, conn->writer_state); pthread_join(conn->writer, NULL); } /* Close the socket(s) */ close(conn->rsocket); if (conn->rsocket != conn->wsocket) { close(conn->wsocket); } /* Destroy the queue */ err = skDequeDestroy(conn->queue); assert(err == SKDQ_SUCCESS); #if HAVE_GNUTLS_GNUTLS_H /* Destroy the session */ if (conn->use_tls) { again: rv = gnutls_bye(conn->session, GNUTLS_SHUT_RDWR); if (rv == GNUTLS_E_AGAIN || rv == GNUTLS_E_INTERRUPTED) { goto again; } gnutls_deinit(conn->session); } #endif /* HAVE_GNUTLS_GNUTLS_H */ /* Close the control pipes. */ close(conn->writer_control[READ]); close(conn->writer_control[WRITE]); /* Destory the condition variable */ rv = pthread_cond_destroy(&conn->writer_cond); assert(rv == 0); /* Destory the address */ if (conn->addr != NULL) { free(conn->addr); } /* Finally, free the connection object */ free(conn); RETURN(); } static int accept_connection(sk_msg_queue_t *q) { int fd; sk_msg_conn_queue_t *conn; int rv; struct sockaddr_in addr; sk_sockaddr_t *copy; socklen_t addrlen = sizeof(addr); DEBUG_ENTER_FUNC; assert(q); ASSERT_QUEUE_LOCK(q); retry_accept: fd = accept(q->root->listen_sock, (struct sockaddr *)&addr, &addrlen); if (fd == -1) { DEBUGMSG("accept() [%s]", strerror(errno)); if (errno == EINTR) { goto retry_accept; } if (errno != EWOULDBLOCK) { XASSERT(0); abort(); } RETURN(-1); } /* Create the queue and both references */ copy = (sk_sockaddr_t *)malloc(sizeof(*copy)); if (copy != NULL) { memcpy(copy, &addr, addrlen); } #ifdef HAVE_GNUTLS_GNUTLS_H rv = create_connection(q, fd, fd, copy, &conn, q->root->bind_tls ? SKM_TLS_SERVER : SKM_TLS_NONE); #else rv = create_connection(q, fd, fd, copy, &conn, SKM_TLS_NONE); #endif /* HAVE_GNUTLS_GNUTLS_H */ if (rv != 0) { close(fd); free(copy); RETURN(-1); } conn->state = SKM_CONNECTING; RETURN(0); } static int handle_system_control_message( sk_msg_queue_t *q, sk_msg_t *msg, int socket) { int rv; DEBUG_ENTER_FUNC; assert(q); assert(msg); ASSERT_QUEUE_LOCK(q); switch(msg->hdr.type) { case SKMSG_CTL_CHANNEL_ANNOUNCE: /* Handle the announcement of a connection */ { skm_channel_t rchannel; skm_channel_t lchannel; sk_msg_conn_queue_t *conn; sk_msg_channel_queue_t *chan; sk_channel_pair_t pair; sk_new_channel_info_t info; DEBUG_PRINT1("Handling SKMSG_CTL_CHANNEL_ANNOUNCE"); assert(msg->hdr.size == sizeof(rchannel)); assert(msg->msg); /* Decode the remote channel */ rchannel = SKMSG_CTL_MSG_GET_CHANNEL(msg); /* Create a local channel */ chan = create_channel(q); lchannel = chan->channel; /* Get the connection object */ conn = find_connection(q, socket); assert(conn != NULL); /* Attach the channel to the connection */ rv = set_channel_connecting(q, chan, conn); assert(rv == 0); /* Set the remote channel */ rv = set_channel_connected(q, chan, rchannel); assert(rv == 0); /* Respond to the announcement with the channel pair */ pair.rchannel = htons(rchannel); pair.lchannel = htons(lchannel); DEBUG_PRINT1("Sending SKMSG_CTL_CHANNEL_REPLY (Ext-control)"); rv = send_message(q, lchannel, SKMSG_CTL_CHANNEL_REPLY, &pair, sizeof(pair), SKM_SEND_CONTROL); XASSERT(rv == 0); /* Announce the new channel internally */ info.channel = pair.lchannel; if (conn->addr != NULL) { memcpy(&info.addr, conn->addr, sizeof(info.addr)); info.known = 1; } else { info.known = 0; } DEBUG_PRINT1("Sending SKMSG_CTL_NEW_CONNECTION (Internal)"); rv = send_message(q, SKMSG_CHANNEL_CONTROL, SKMSG_CTL_NEW_CONNECTION, &info, sizeof(info), SKM_SEND_INTERNAL); XASSERT(rv == 0); } break; case SKMSG_CTL_CHANNEL_REPLY: /* Handle the reply to a channel announcement */ { sk_channel_pair_t *pair; sk_msg_channel_queue_t *chan; skm_channel_t rchannel; skm_channel_t lchannel; DEBUG_PRINT1("Handling SKMSG_CTL_CHANNEL_REPLY"); assert(msg->hdr.size == sizeof(*pair)); assert(msg->msg); /* Decode the channels: Reversed directionality is on purpose. */ pair = (sk_channel_pair_t *)msg->msg; rchannel = ntohs(pair->lchannel); lchannel = ntohs(pair->rchannel); /* Get the channel object */ chan = find_channel(q, lchannel); XASSERT(chan != NULL); /* Set the remote channel */ rv = set_channel_connected(q, chan, rchannel); assert(rv == 0); chan->conn->state = SKM_CONNECTED; /* Complete the connection */ assert(chan->state != SKM_CONNECTING); assert(chan->is_pending); MUTEX_BROADCAST(&chan->pending); } break; case SKMSG_CTL_CHANNEL_KILL: /* Handle the death of a remote channel */ { skm_channel_t channel; sk_msg_channel_queue_t *chan; DEBUG_PRINT1("Handling SKMSG_CTL_CHANNEL_KILL"); assert(msg->hdr.size == sizeof(channel)); assert(msg->msg); /* Decode the channel. */ channel = SKMSG_CTL_MSG_GET_CHANNEL(msg); /* Get the channel object. */ chan = find_channel(q, channel); XASSERT(chan != NULL); /* Close the channel. */ set_channel_closed(q, chan, 0, 0); } break; default: assert(0); abort(); } skMsgDestroy(msg); RETURN(0); } static void *reader_thread(void *vqueue) { sk_msg_queue_t *q = (sk_msg_queue_t *)vqueue; DEBUG_ENTER_FUNC; DEBUG_PRINT1("STARTED reader_thread"); assert(q); MUTEX_LOCK(QUEUE_MUTEX(q)); q->root->reader_state = SKM_THREAD_RUNNING; MUTEX_BROADCAST(&q->root->reader_cond); while (q->root->reader_state == SKM_THREAD_RUNNING) { int rv; int_dict_iter_t *list; sk_msg_conn_queue_t *conn; fd_set rset; int max_sock = 0; intkey_t sock; void *cont; ASSERT_QUEUE_LOCK(q); /* Build the read set */ FD_ZERO(&rset); list = int_dict_open(q->root->connection); MEM_ASSERT(list); while (int_dict_next(list, &sock, &conn) != NULL) { assert(sock >= 0); if (conn->state != SKM_CLOSED) { FD_SET(sock, &rset); max_sock = sock; } } int_dict_close(list); if (q->root->listen_sock) { FD_SET(q->root->listen_sock, &rset); if (q->root->listen_sock > max_sock) { max_sock = q->root->listen_sock; } } FD_SET(q->root->reader_control[READ], &rset); if (q->root->reader_control[READ] > max_sock) { max_sock = q->root->reader_control[READ]; } /* Call select */ QUEUE_UNLOCK(q); rv = select(max_sock + 1, &rset, NULL, NULL, NULL); QUEUE_LOCK(q); if (rv == -1) { if (errno == EINTR || errno == EBADF) { continue; } } XASSERT(rv > 0); /* Deal with reading first */ cont = int_dict_get_first(q->root->connection, &conn); while (cont != NULL) { sock = conn->rsocket; assert(sock >= 0); if (conn->state == SKM_CLOSED) { cont = int_dict_get_next(q->root->connection, sock, &conn); continue; } if (FD_ISSET(sock, &rset)) { sk_msg_t *message; sk_msg_channel_queue_t *chan; /* Read a message */ DEBUG_PRINT1("Calling recv"); rv = conn->fn.recv(conn, &message); if (rv != 0) { /* Treat the connection as closed */ destroy_connection(q, conn, 0); cont = int_dict_get_next(q->root->connection, sock, &conn); continue; } /* Handle control messages */ if (message->hdr.channel == SKMSG_CHANNEL_CONTROL && message->hdr.type >= SKMSG_MINIMUM_SYSTEM_CTL_CHANNEL) { rv = handle_system_control_message(q, message, sock); cont = int_dict_get_next(q->root->connection, sock, &conn); continue; } /* Handle ordinary messages */ chan = find_channel(q, message->hdr.channel); if (chan == NULL) { skMsgDestroy(message); } else { /* Put the message on the queue */ DEBUG_PRINT3("Enqueue: chan=%#x type=%#x", message->hdr.channel, message->hdr.type); DEBUG_PRINT2("From reader: %p", message); rv = mqQueueAdd(chan->queue, message); XASSERT(rv == 0); } } cont = int_dict_get_next(q->root->connection, sock, &conn); } /* Okay, deal with accepting here. */ if (q->root->listen_sock && FD_ISSET(q->root->listen_sock, &rset)) { DEBUG_PRINT1("Calling accept_connection"); accept_connection(q); } /* Deal with shutdown and reconf */ if (FD_ISSET(q->root->reader_control[READ], &rset)) { uint8_t c; ssize_t size; size = read(q->root->reader_control[READ], &c, 1); XASSERT(size == 1); switch (c) { case SKCTL_RECONF: DEBUG_PRINT1("Handling reconf"); break; default: assert(0); abort(); } } } q->root->reader_state = SKM_THREAD_ENDED; THREAD_END(q); QUEUE_UNLOCK(q); DEBUG_PRINT1("STOPPED reader_thread"); RETURN(NULL); } static void *writer_thread(void *vconn) { int maxfd; sk_queue_and_conn_t *both = (sk_queue_and_conn_t *)vconn; sk_msg_conn_queue_t *conn = both->conn; sk_msg_queue_t *q = both->q; int again = 0; DEBUG_ENTER_FUNC; DEBUG_PRINT1("STARTED writer_thread"); assert(conn); assert(q); free(both); QUEUE_LOCK(q); conn->writer_state = SKM_THREAD_RUNNING; MUTEX_BROADCAST(&conn->writer_cond); maxfd = conn->wsocket; if (conn->writer_control[READ] > maxfd) { maxfd = conn->writer_control[READ]; } maxfd++; while (conn->writer_state == SKM_THREAD_RUNNING) { int rv; fd_set rset; fd_set wset; sk_msg_t *msg; skDQErr_t err; int block = (conn->state != SKM_CLOSED); FD_ZERO(&rset); FD_SET(conn->writer_control[READ], &rset); FD_ZERO(&wset); FD_SET(conn->wsocket, &wset); QUEUE_UNLOCK(q); if (!again) { if (block) { err = skDequePopBack(conn->queue, (void *)&msg); } else { err = skDequePopBackNB(conn->queue, (void *)&msg); } if (err != SKDQ_SUCCESS) { assert(err == SKDQ_UNBLOCKED || err == SKDQ_DESTROYED || err == SKDQ_EMPTY); QUEUE_LOCK(q); break; } if (msg->hdr.channel == SKMSG_CHANNEL_CONTROL && msg->hdr.type == SKMSG_WRITER_UNBLOCKER) { /* Do not destroy message, as this is a special static message. */ QUEUE_LOCK(q); continue; } again = 1; } rv = select(maxfd, &rset, &wset, NULL, NULL); if (rv == -1) { if (errno == EINTR || errno == EBADF) { QUEUE_LOCK(q); continue; } } XASSERT(rv > 0); QUEUE_LOCK(q); if (FD_ISSET(conn->wsocket, &wset)) { rv = conn->fn.send(conn, msg); XASSERT(rv == 0 || rv == SKMERR_CLOSED); again = 0; skMsgDestroy(msg); msg = NULL; } if (FD_ISSET(conn->writer_control[READ], &rset)) { uint8_t c; ssize_t size; size = read(conn->writer_control[READ], &c, 1); XASSERT(size == 1); switch (c) { case SKCTL_RECONF: DEBUG_PRINT1("Handling reconf"); if ((msg != NULL) && (conn->writer_state != SKM_THREAD_RUNNING)) { skMsgDestroy(msg); } break; default: assert(0); abort(); } } } conn->writer_state = SKM_THREAD_ENDED; THREAD_END(q); QUEUE_UNLOCK(q); DEBUG_PRINT1("STOPPED writer_thread"); RETURN(NULL); } int skMsgQueueCreate(sk_msg_queue_t **queue) { sk_msg_queue_t *q; int retval = 0; int fd[2]; int rv; sk_msg_conn_queue_t *conn; sk_msg_channel_queue_t *chan; DEBUG_ENTER_FUNC; q = calloc(1, sizeof(*q)); if (q == NULL) { RETURN(SKMERR_MEMORY); } q->root = calloc(1, sizeof(*q->root)); if (q->root == NULL) { free(q); RETURN(SKMERR_MEMORY); } THREAD_INFO_INIT(q); rv = pipe(q->root->reader_control); if (rv != 0) { retval = SKMERR_PIPE; goto error; } q->root->channel = int_dict_create(sizeof(sk_msg_channel_queue_t *)); if (q->root->channel == NULL) { retval = SKMERR_MEMORY; goto error; } q->root->connection = int_dict_create(sizeof(sk_msg_conn_queue_t *)); if (q->root->connection == NULL) { retval = SKMERR_MEMORY; goto error; } q->root->groups = int_dict_create(sizeof(sk_msg_queue_t *)); if (q->root->groups == NULL) { retval = SKMERR_MEMORY; goto error; } q->channel = int_dict_create(sizeof(sk_msg_channel_queue_t *)); if (q->channel == NULL) { retval = SKMERR_MEMORY; goto error; } rv = pthread_mutex_init(QUEUE_MUTEX(q), NULL); if (rv != 0) { retval = SKMERR_MUTEX; goto error; } rv = pthread_cond_init(&q->shutdowncond, NULL); if (rv != 0) { retval = SKMERR_MUTEX; goto error; } q->group = mqCreateFair(sk_destroy_report_message); if (q->group == NULL) { goto error; } rv = pipe(fd); if (rv == -1) { retval = SKMERR_PIPE; goto error; } /* Initialize the thread start state */ pthread_cond_init(&q->root->reader_cond, NULL); q->root->reader_state = SKM_THREAD_BEFORE; /* Lock the mutex to satisfy preconditions for mucking with connections and channels. */ QUEUE_LOCK(q); /* Start the reader thread, */ THREAD_START("skmsg_reader", rv, q, &q->root->reader, reader_thread, q); if (rv != 0) { retval = SKMERR_PTHREAD; goto error; } /* Wait for the thread to begin. */ while (q->root->reader_state == SKM_THREAD_BEFORE) { QUEUE_WAIT(&q->root->reader_cond, q); } assert(q->root->reader_state == SKM_THREAD_RUNNING); /* Create an internal connection for the control channel. */ rv = create_connection(q, fd[READ], fd[WRITE], NULL, &conn, SKM_TLS_NONE); XASSERT(rv == 0); /* Create a channel for the control channel. */ q->root->next_channel = SKMSG_CHANNEL_CONTROL; chan = create_channel(q); /* Attach the internal connection to the control channel. */ rv = set_channel_connecting(q, chan, conn); assert(rv == 0); /* And let's completely connect it. */ rv = set_channel_connected(q, chan, SKMSG_CHANNEL_CONTROL); conn->state = SKM_CONNECTED; /* Unlock the mutex */ QUEUE_UNLOCK(q); *queue = q; RETURN(0); error: XASSERT(0); skMsgQueueDestroy(q); RETURN(retval); } static void sk_msg_queue_shutdown( sk_msg_queue_t *q) { sk_msg_channel_queue_t *chan; void *cont; DEBUG_ENTER_FUNC; assert(q); ASSERT_QUEUE_LOCK(q); if (q->shuttingdown) { RETURN(); } q->shuttingdown = 1; /* Shut down a queue by shutting down all its channels. */ cont = int_dict_get_first(q->channel, &chan); while (cont != NULL) { intkey_t key = chan->channel; if (chan->state == SKM_CONNECTED || chan->state == SKM_CONNECTING) { set_channel_closed(q, chan, 0, 0); } cont = int_dict_get_next(q->channel, key, &chan); } /* And then shutting down the multiqueue */ mqShutdown(q->group); q->shuttingdown = 0; MUTEX_BROADCAST(&q->shutdowncond); RETURN(); } void skMsgQueueShutdown( sk_msg_queue_t *q) { DEBUG_ENTER_FUNC; assert(q); QUEUE_LOCK(q); sk_msg_queue_shutdown(q); QUEUE_UNLOCK(q); RETURN(); } void skMsgQueueShutdownAll( sk_msg_queue_t *q) { static uint8_t skctl_reconf = SKCTL_RECONF; sk_msg_channel_queue_t *chan; int rv; void *cont; DEBUG_ENTER_FUNC; assert(q); QUEUE_LOCK(q); if (q->root->shuttingdown) { QUEUE_UNLOCK(q); RETURN(); } q->root->shuttingdown = 1; q->root->shutdownqueue = q; q->root->reader_state = SKM_THREAD_SHUTTING_DOWN; /* Shut down all channels */ cont = int_dict_get_first(q->root->channel, &chan); while (cont != NULL) { intkey_t key = chan->channel; sk_msg_queue_shutdown(chan->group); cont = int_dict_get_next(q->root->channel, key, &chan); } if (q->root->listen_sock != 0) { close(q->root->listen_sock); q->root->listen_sock = 0; } retry: /* Shut down the reading thread. */ DEBUG_PRINT1("SIGNAL reader exit"); rv = write(q->root->reader_control[WRITE], &skctl_reconf, 1); if (rv == -1 && errno == EINTR) { goto retry; } assert(rv == 1); THREAD_WAIT_ALL_END(q); pthread_join(q->root->reader, NULL); close(q->root->reader_control[READ]); close(q->root->reader_control[WRITE]); q->root->shuttingdown = 0; MUTEX_BROADCAST(&q->shutdowncond); QUEUE_UNLOCK(q); RETURN(); } static void skMsgQueueDestroyAll( sk_msg_queue_t *q) { int rv; DEBUG_ENTER_FUNC; assert(q); ASSERT_QUEUE_LOCK(q); /* Verify that all channels have been destroyed */ assert(int_dict_get_first(q->root->channel, NULL) == NULL); int_dict_destroy(q->root->channel); assert(int_dict_get_first(q->root->connection, NULL) == NULL); int_dict_destroy(q->root->connection); int_dict_destroy(q->root->groups); QUEUE_UNLOCK(q); THREAD_INFO_DESTROY(q); rv = pthread_cond_destroy(&q->root->reader_cond); assert(rv == 0); rv = pthread_mutex_destroy(QUEUE_MUTEX(q)); assert(rv == 0); free(q->root); free(q); RETURN(); } void skMsgQueueDestroy( sk_msg_queue_t *q) { sk_msg_root_t *root; sk_msg_channel_queue_t *chan; void *cont; DEBUG_ENTER_FUNC; assert(q); QUEUE_LOCK(q); root = q->root; while (q->shuttingdown || (root->shuttingdown && root->shutdownqueue == q)) { QUEUE_WAIT(&q->shutdowncond, q); } /* Destroy the channels */ cont = int_dict_get_first(q->channel, &chan); while (cont != NULL) { intkey_t channel = chan->channel; destroy_channel(q, chan, 0); cont = int_dict_get_next(q->channel, channel, &chan); } mqShutdown(q->group); mqDestroy(q->group); int_dict_destroy(q->channel); if (int_dict_get_first(q->root->groups, NULL) == NULL) { skMsgQueueDestroyAll(q); RETURN(); } free(q); MUTEX_UNLOCK(&root->mutex); RETURN(); } static int skMsgQueueBind( sk_msg_queue_t *q, struct sockaddr_in *addr, int UNUSED_IF_NOTLS(tls)) { static uint8_t skctl_reconf = SKCTL_RECONF; static int on = 1; int sock; int rv; DEBUG_ENTER_FUNC; assert(q); XASSERT(q->root->listen_sock == 0); /* Bind a socket to the address*/ sock = socket(AF_INET, SOCK_STREAM, 0); XASSERT(sock != -1); rv = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)); XASSERT(rv != -1); rv = bind(sock, (struct sockaddr *)addr, sizeof(*addr)); if (rv != 0) { return -1; } rv = listen(sock, LISTENQ); XASSERT(rv != -1); QUEUE_LOCK(q); /* Set the listen sock for the queue. */ q->root->listen_sock = sock; q->root->bind_type = SOCK_STREAM; #ifdef HAVE_GNUTLS_GNUTLS_H q->root->bind_tls = tls; #endif memcpy(&q->root->bind_addr, addr, sizeof(*addr)); /* Announce it to the reader thread */ DEBUG_PRINT1("SIGNAL reader reconf"); rv = write(q->root->reader_control[WRITE], &skctl_reconf, 1); XASSERT(rv == 1); QUEUE_UNLOCK(q); RETURN(0); } /* Start a listener */ int skMsgQueueBindTCP( sk_msg_queue_t *queue, struct sockaddr_in *addr) { return skMsgQueueBind(queue, addr, 0); } #ifdef HAVE_GNUTLS_GNUTLS_H /* Start a listener */ int skMsgQueueBindTLS( sk_msg_queue_t *queue, struct sockaddr_in *addr) { return skMsgQueueBind(queue, addr, 1); } #endif /* HAVE_GNUTLS_GNUTLS_H */ static int skMsgQueueConnect( sk_msg_queue_t *q, struct sockaddr_in *addr, skm_channel_t *channel, int tls) { int rv; int sock; sk_msg_conn_queue_t *conn; sk_msg_channel_queue_t *chan; int retval; skm_channel_t lchannel; sk_sockaddr_t *copy; DEBUG_ENTER_FUNC; assert(q); sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == -1) { RETURN(-1); } /* Connect to the remote side */ rv = connect(sock, (struct sockaddr *)addr, sizeof(*addr)); if (rv == -1) { close(sock); RETURN(-1); } /* Create a channel and connection, and bind them */ QUEUE_LOCK(q); copy = (sk_sockaddr_t *)malloc(sizeof(*copy)); if (copy != NULL) { memcpy(copy, addr, sizeof(*addr)); } rv = create_connection(q, sock, sock, copy, &conn, tls ? SKM_TLS_CLIENT : SKM_TLS_NONE); if (rv == -1) { close(sock); free(copy); QUEUE_UNLOCK(q); RETURN(-1); } chan = create_channel(q); rv = set_channel_connecting(q, chan, conn); XASSERT(rv == 0); /* Announce the channel id to the remote queue */ lchannel = htons(chan->channel); DEBUG_PRINT1("Sending SKMSG_CTL_CHANNEL_ANNOUNCE (Ext-control))"); rv = send_message(q, chan->channel, SKMSG_CTL_CHANNEL_ANNOUNCE, &lchannel, sizeof(lchannel), SKM_SEND_CONTROL); XASSERT(rv == 0); /* Wait for a reply */ chan->is_pending = 1; while (chan->is_pending && chan->state == SKM_CONNECTING) { QUEUE_WAIT(&chan->pending, q); } chan->is_pending = 0; if (chan->state == SKM_CLOSED) { destroy_channel(q, chan, 0); retval = -1; } else { retval = 0; *channel = chan->channel; } QUEUE_UNLOCK(q); RETURN(retval); } int skMsgQueueConnectTCP( sk_msg_queue_t *q, struct sockaddr_in *addr, skm_channel_t *channel) { return skMsgQueueConnect(q, addr, channel, 0); } #ifdef HAVE_GNUTLS_GNUTLS_H int skMsgQueueConnectTLS( sk_msg_queue_t *q, struct sockaddr_in *addr, skm_channel_t *channel) { return skMsgQueueConnect(q, addr, channel, 1); } #endif /* HAVE_GNUTLS_GNUTLS_H */ int skMsgChannelNew( sk_msg_queue_t *q, skm_channel_t channel, skm_channel_t *new_channel) { sk_msg_channel_queue_t *chan; sk_msg_channel_queue_t *newchan; skm_channel_t lchannel; int retval; int rv; DEBUG_ENTER_FUNC; assert(q); assert(new_channel); QUEUE_LOCK(q); chan = find_channel(q, channel); XASSERT(chan != NULL); XASSERT(chan->state == SKM_CONNECTED); assert(chan->conn != NULL); /* Create a channel and connection, and bind it to the connection */ newchan = create_channel(q); rv = set_channel_connecting(q, newchan, chan->conn); assert(rv == 0); lchannel = htons(newchan->channel); /* Announce the channel id to the remote queue */ DEBUG_PRINT1("Sending SKMSG_CTL_CHANNEL_ANNOUNCE (Ext-control))"); rv = send_message(q, newchan->channel, SKMSG_CTL_CHANNEL_ANNOUNCE, &lchannel, sizeof(lchannel), SKM_SEND_CONTROL); XASSERT(rv == 0); /* Wait for a response */ newchan->is_pending = 1; while (newchan->is_pending && newchan->state == SKM_CONNECTING) { QUEUE_WAIT(&newchan->pending, q); } newchan->is_pending = 0; if (newchan->state == SKM_CLOSED) { retval = -1; destroy_channel(q, newchan, 0); } else { retval = 0; *new_channel = newchan->channel; } QUEUE_UNLOCK(q); RETURN(retval); } int skMsgChannelSplit( sk_msg_queue_t *q, skm_channel_t channel, sk_msg_queue_t **new_queue) { sk_msg_queue_t *new_q; int rv; DEBUG_ENTER_FUNC; assert(q); new_q = (sk_msg_queue_t *)calloc(1, sizeof(*new_q)); if (new_q == NULL) { return -1; } rv = pthread_cond_init(&new_q->shutdowncond, NULL); if (rv != 0) { free(new_q); return -1; } new_q->channel = int_dict_create(sizeof(sk_msg_channel_queue_t *)); if (new_q->channel == NULL) { free(new_q); return -1; } new_q->group = mqCreateFair(sk_destroy_report_message); if (new_q->group == NULL) { int_dict_destroy(new_q->channel); free(new_q); return -1; } new_q->root = q->root; rv = skMsgChannelMove(channel, new_q); if (rv != 0) { skMsgQueueDestroy(new_q); } else { *new_queue = new_q; } return rv; } int skMsgChannelMove( skm_channel_t channel, sk_msg_queue_t *q) { sk_msg_channel_queue_t *chan; int rv; int retval = 0; DEBUG_ENTER_FUNC; assert(q); QUEUE_LOCK(q); chan = find_channel(q, channel); if (chan == NULL) { retval = -1; goto end; } rv = mqQueueMove(q->group, chan->queue); assert(rv == 0); rv = int_dict_del(chan->group->channel, channel); assert(rv == 0); rv = int_dict_set(q->channel, channel, &chan); assert(rv == 0); rv = int_dict_set(q->root->groups, channel, &q); assert(rv == 0); chan->group = q; end: QUEUE_UNLOCK(q); RETURN(retval); } int skMsgChannelKill( sk_msg_queue_t *q, skm_channel_t channel) { sk_msg_channel_queue_t *chan; DEBUG_ENTER_FUNC; assert(q); QUEUE_LOCK(q); chan = find_channel(q, channel); XASSERT(chan != NULL); destroy_channel(q, chan, 1); QUEUE_UNLOCK(q); RETURN(0); } static int send_message( sk_msg_queue_t *q, skm_channel_t lchannel, skm_type_t type, void *message, skm_len_t length, sk_send_type_t send_type) { sk_msg_channel_queue_t *chan; sk_msg_t *msg; skDQErr_t err; int rv; DEBUG_ENTER_FUNC; assert(q); assert(message || length == 0); ASSERT_QUEUE_LOCK(q); if (int_dict_get(q->root->channel, lchannel, &chan) == NULL) { RETURN(-1); } if (chan->state == SKM_CLOSED && send_type != SKM_SEND_INTERNAL) { RETURN(0); } msg = malloc(sizeof(*msg)); if (msg == NULL) { RETURN(-1); } msg->msg = malloc(length); if (msg->msg == NULL) { free(msg); RETURN(-1); } memcpy(msg->msg, message, length); msg->hdr.type = type; msg->hdr.size = length; switch (send_type) { case SKM_SEND_INTERNAL: msg->hdr.channel = chan->channel; DEBUG_PRINT3("Enqueue: chan=%#x type=%#x", msg->hdr.channel, msg->hdr.type); rv = mqQueueAdd(chan->queue, msg); if (rv != 0) { free(msg); RETURN(-1); } break; case SKM_SEND_REMOTE: msg->hdr.channel = chan->rchannel; err = skDequePushFront(chan->conn->queue, msg); if (err != SKDQ_SUCCESS) { free(msg); RETURN(-1); } break; case SKM_SEND_CONTROL: msg->hdr.channel = SKMSG_CHANNEL_CONTROL; err = skDequePushFront(chan->conn->queue, msg); if (err != SKDQ_SUCCESS) { free(msg); RETURN(-1); } break; default: assert(0); abort(); } RETURN(0); } int skMsgQueueSendMessage( sk_msg_queue_t *q, skm_channel_t channel, skm_type_t type, void *message, skm_len_t length) { int rv; DEBUG_ENTER_FUNC; QUEUE_LOCK(q); rv = send_message(q, channel, type, message, length, SKM_SEND_REMOTE); QUEUE_UNLOCK(q); RETURN(rv); } int skMsgQueueInjectMessage( sk_msg_queue_t *q, skm_channel_t channel, skm_type_t type, void *message, skm_len_t length) { int rv; DEBUG_ENTER_FUNC; QUEUE_LOCK(q); rv = send_message(q, channel, type, message, length, SKM_SEND_INTERNAL); QUEUE_UNLOCK(q); RETURN(rv); } int skMsgQueueGetMessage( sk_msg_queue_t *q, sk_msg_t **message) { sk_msg_t *msg; sk_msg_channel_queue_t *chan; int rv; DEBUG_ENTER_FUNC; assert(q); assert(message); do { rv = mqGet(q->group, (void **)&msg); if (rv != 0) { RETURN(-1); } DEBUG_PRINT2("From GetMessage: %p", msg); DEBUG_PRINT4("Deque: chan=%#x type=%#x size=%d", msg->hdr.channel, msg->hdr.type, msg->hdr.size); QUEUE_LOCK(q); chan = find_channel(q, msg->hdr.channel); QUEUE_UNLOCK(q); } while (chan == NULL); *message = msg; RETURN(0); } int skMsgQueueGetMessageFromChannel( sk_msg_queue_t *q, skm_channel_t channel, sk_msg_t **message) { sk_msg_t *msg; sk_msg_channel_queue_t *chan; int rv; DEBUG_ENTER_FUNC; assert(q); assert(message); QUEUE_LOCK(q); chan = find_channel(q, channel); QUEUE_UNLOCK(q); if (chan == NULL) { RETURN(-1); } rv = mqQueueGet(chan->queue, (void **)&msg); if (rv != 0) { RETURN(-1); } DEBUG_PRINT4("Deque: chan=%#x type=%#x size=%d", msg->hdr.channel, msg->hdr.type, msg->hdr.size); assert(msg->hdr.channel == channel); QUEUE_LOCK(q); chan = find_channel(q, msg->hdr.channel); QUEUE_UNLOCK(q); if (chan == NULL) { RETURN(-1); } *message = msg; RETURN(0); } int skMsgGetRemoteChannelID( sk_msg_queue_t *q, skm_channel_t lchannel, skm_channel_t *rchannel) { sk_msg_channel_queue_t *chan; int retval; DEBUG_ENTER_FUNC; assert(q); QUEUE_LOCK(q); chan = find_channel(q, lchannel); if (chan == NULL) { retval = -1; } else { *rchannel = chan->rchannel; retval = 0; } QUEUE_UNLOCK(q); return retval; } void skMsgDestroy( sk_msg_t *msg) { DEBUG_ENTER_FUNC; assert(msg); if (msg->msg) { free(msg->msg); } free(msg); RETURN(); } skm_channel_t skMsgChannel( sk_msg_t *msg) { DEBUG_ENTER_FUNC; assert(msg); RETURN(msg->hdr.channel); } skm_type_t skMsgType( sk_msg_t *msg) { DEBUG_ENTER_FUNC; assert(msg); RETURN(msg->hdr.type); } skm_len_t skMsgLength( sk_msg_t *msg) { DEBUG_ENTER_FUNC; assert(msg); RETURN(msg->hdr.size); } void *skMsgMessage( sk_msg_t *msg) { DEBUG_ENTER_FUNC; assert(msg); RETURN(msg->msg); } /* ** Local Variables: ** mode:c ** indent-tabs-mode:nil ** c-basic-offset:4 ** End: */