/*
 * The Spread Toolkit.
 *     
 * The contents of this file are subject to the Spread Open-Source
 * License, Version 1.0 (the ``License''); you may not use
 * this file except in compliance with the License.  You may obtain a
 * copy of the License at:
 *
 * http://www.spread.org/license/
 *
 * or in the file ``license.txt'' found in this distribution.
 *
 * Software distributed under the License is distributed on an AS IS basis, 
 * WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 
 * for the specific language governing rights and limitations under the 
 * License.
 *
 * The Creators of Spread are:
 *  Yair Amir, Michal Miskin-Amir, Jonathan Stanton.
 *
 *  Copyright (C) 1993-2004 Spread Concepts LLC <spread@spreadconcepts.com>
 *
 *  All Rights Reserved.
 *
 * Major Contributor(s):
 * ---------------
 *    Cristina Nita-Rotaru crisn@cs.purdue.edu - group communication security.
 *    Theo Schlossnagle    jesus@omniti.com - Perl, skiplists, autoconf.
 *    Dan Schoenblum       dansch@cnds.jhu.edu - Java interface.
 *    John Schultz         jschultz@cnds.jhu.edu - contribution to process group membership.
 *
 */


#include "arch.h"
#include <string.h>
#include <stdio.h>
#include "mutex.h"

#ifndef	ARCH_PC_WIN95

#include <errno.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h> 
#include <netinet/tcp.h> 
#include <sys/un.h>
#include <netdb.h>
#include <signal.h>
#include <sys/ioctl.h>
#include <unistd.h>

#else	/* ARCH_PC_WIN95 */

#include <winsock.h>
#define ioctl   ioctlsocket

	WSADATA		WSAData;

#endif	/* ARCH_PC_WIN95 */

#include "sp_events.h"
#include "spread_params.h"
#include "sess_types.h"
#include "scatter.h"
#include "alarm.h"
#include "sp_func.h"
#include "acm.h"

typedef	struct	dummy_sp_session {
	mailbox	mbox;
	char	private_group_name[MAX_GROUP_NAME];
        message_header  recv_saved_head;
        int     recv_message_saved;
} sp_session;

struct auth_method_info {
        char    name[MAX_AUTH_NAME];
        int     (*authenticate) (int, void *);
        void    *auth_data;
};

static  int     sp_null_authenticate(int, void *);
static  struct auth_method_info Auth_Methods[MAX_AUTH_METHODS] = { {"NULL", sp_null_authenticate, NULL} };
static  int     Num_Reg_Auth_Methods = 1;

#define	MAX_MUTEX	256
#define	MAX_MUTEX_MASK	0x000000ff

#ifdef _REENTRANT
#ifndef	ARCH_PC_WIN95
static	mutex_type	Init_mutex = MUTEX_STATIC_INIT;
#else	/* ARCH_PC_WIN95 */
static	mutex_type	Init_mutex = {MUTEX_STATIC_INIT};
#endif	/* ARCH_PC_WIN95 */

static	mutex_type	Struct_mutex;
static	mutex_type	Mbox_mutex[MAX_MUTEX][2];

#endif /* def _REENTRANT */

static	int		Num_sessions = 0;
static	sp_session	Sessions[MAX_SESSIONS];

static  sp_time         Zero_timeout = { 0, 0 };

static	void    Flip_mess( message_header *head_ptr );
static  void	SP_kill( mailbox mbox );
static	int	SP_get_session( mailbox mbox );
static	int	SP_internal_multicast( mailbox mbox, service service_type, 
				       int num_groups,
				       const char groups[][MAX_GROUP_NAME],
				       int16 mess_type,
				       const scatter *scat_mess );

/* This is a null authenticate method that does nothing */

static  int     sp_null_authenticate(int fd, void * auth_data)
{
        /* return success */
        return(1);
}
static  int     valid_auth_method(char *auth_method, char *auth_list, int auth_list_len)
{
        char *cur_p, *next_p;
        char list_str[MAX_AUTH_NAME * MAX_AUTH_METHODS];
        memcpy(list_str, auth_list, auth_list_len);

        list_str[auth_list_len] = '\0';
        cur_p = list_str;
        do {
                next_p = strchr(cur_p, ' ');
                if (next_p != NULL)
                        *next_p = '\0';
                if (!strcmp(auth_method, cur_p) )
                        return(1);
                if (next_p != NULL)
                        cur_p = next_p + 1;
                else
                        cur_p = NULL;
        } while (NULL != cur_p );
        /* didn't find the method in the list */
        return(0);
}
static  void    sp_initialize_locks(void)
{
        int ret, i;

	ret = Mutex_trylock( &Init_mutex );
	if( ret == 0 )
	{
		/* 
		 * we managed to lock the Init_mutex. This means we are the first thread
		 * to get here.
		 */

		Mutex_init( &Struct_mutex );
		for( i=0; i < MAX_MUTEX; i++ )
		{
			Mutex_init( &Mbox_mutex[i][0] );
			Mutex_init( &Mbox_mutex[i][1] );
		}
#ifdef ARCH_PC_WIN95

		ret = WSAStartup( MAKEWORD(2,0), &WSAData );
		if( ret != 0 )
			Alarm( EXIT, "sp_initialize_locks: winsock initialization error %d\n", ret );
#endif	/* ARCH_PC_WIN95 */
	}
        return;
}
/* This calls recv() with the additional features of ignoring syscall interruptions
 * caused by signals delivered to this application, and of returning in at most *time_out time.
 * When it returns the *time_out variable will be modified to have contain the value:
 * old *time_out - time spent in this function.
 *
 * If *time_out == {0,0} then the call is made blocking and will NOT timeout.
 */
static  int     recv_nointr_timeout(int s, void *buf, size_t len, int flags, sp_time *time_out)
{
        int ret, num_ready;
        fd_set rset,fixed_rset;
        sp_time start_time, temp_time, target_time, wait_time;

        if ( len == 0 )
                return(0);
        if ( E_compare_time(Zero_timeout, *time_out) < 0 )
        {
                start_time = E_get_time();
                target_time = E_add_time(start_time, *time_out);
                wait_time = *time_out;
                FD_ZERO(&fixed_rset);
                FD_SET(s, &fixed_rset);
                rset = fixed_rset;
                while( ((num_ready = select(s+1, &rset, NULL, NULL, (struct timeval *)&wait_time)) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                {
                        temp_time = E_get_time();
                        if (E_compare_time(temp_time, target_time) < 0 ) {
                                wait_time = E_sub_time(target_time, temp_time);
                        } else {
                                printf("recv_nointr_timeout: Timed out when interrupted\n");
                                sock_set_errno( ERR_TIMEDOUT );
                                return(-1);
                        }
                        rset = fixed_rset;
                }
                if ( !num_ready ) {
                        printf("recv_nointr_timeout: Timed out\n");
                        sock_set_errno( ERR_TIMEDOUT );
                        return(-1);
                } 
        }
        while( ((ret = recv( s, buf, len, flags)) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                ;
        if ( E_compare_time(Zero_timeout, *time_out) < 0 )
        {
                temp_time = E_sub_time(E_get_time(), start_time);
                *time_out = E_sub_time(*time_out, temp_time);
        }
        return(ret);
}

/* This calls connect() with the additional features of ignoring syscall interruptions
 * caused by signals delivered to this application, and of returning in at most *time_out time.
 * When it returns the *time_out variable will be modified to have contain the value:
 * old *time_out - time spent in this function.
 *
 * If *time_out == {0,0} then the call is made blocking and will NOT timeout.
 */
static  int     connect_nointr_timeout(int s, struct sockaddr *sname, socklen_t slen, sp_time *time_out)
{
    int         ret, num_ready;
    fd_set      rset,fixed_rset,wset;
    sp_time     start_time, temp_time, target_time, wait_time;
    int         non_blocking = 0;
    int         err;
    int         on;
    int         ret_ioctl;
    sockopt_len_t   elen;

    if ( E_compare_time(Zero_timeout, *time_out) < 0 )
    {
        non_blocking = 1;
        start_time = E_get_time();
        target_time = E_add_time(start_time, *time_out);
        wait_time = *time_out;
        /* set file descriptor to non-blocking */
        on = 1;
        ret_ioctl = ioctl( s, FIONBIO, &on);
    }
    /* Handle EINTR while connecting by waiting with select until the 
     * connect completes or fails.  This is a while loop but it is never 
     * done more then once. The while is so we can use 'break' 
     */
    while( ((ret = connect( s, sname, slen ) ) == -1) 
           && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK) || (sock_errno == EINPROGRESS)) )
    {
        FD_ZERO(&fixed_rset);
        FD_SET(s, &fixed_rset);
        rset = fixed_rset;
        wset = rset;
        Alarmp( SPLOG_DEBUG, SESSION, "connect_nointr_timeout: connect in progress for socket %d, now wait in select\n", s);
        /* wait for select to timeout (num_ready == 0), give a permanent error (num_ready < 0 && sock_errno != transient). If transient error, retry after checking to make sure timeout has not expired */
        while( ((num_ready = select(s+1, &rset, &wset, NULL, (struct timeval *)&wait_time)) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
        {
            temp_time = E_get_time();
            if (E_compare_time(temp_time, target_time) < 0 ) {
                wait_time = E_sub_time(target_time, temp_time);
            } else {
                Alarmp( SPLOG_WARNING, SESSION, "connect_nointr_timeout: connect interrupted and select wait timesout during transient error: %s\n", sock_strerror(sock_errno));
                close(s);
                sock_set_errno( ERR_TIMEDOUT );
                ret = -1;
                goto done_connect_try;
            }
            rset = fixed_rset;
            wset = rset;
        }
        if ( num_ready == 0 ) {
            /* timeout */
            close(s);
            sock_set_errno( ERR_TIMEDOUT );
            ret = -1;
            break;
        } else if ( num_ready < 0 ) 
        {
            Alarmp( SPLOG_WARNING, SESSION, "connect_nointr_timeout: connect interrupted and error in select wait: %s\n", sock_strerror(sock_errno));
            ret = -1;
            break;
        } 
        if (FD_ISSET(s, &rset) || FD_ISSET( s, &wset))
        {
            err = 0;
            elen = sizeof(err);
            if (getsockopt(s, SOL_SOCKET, SO_ERROR, (void *)&err, &elen) < 0)
            {
                ret = -1;
                break;
            }
            if (err)
            {
                sock_set_errno( err );
                ret = -1;
            } else {
                ret = 0;
            }
            break;
        } else {
            Alarmp( SPLOG_FATAL, SESSION, "connect_nointr_timeout: connect interrupted--but select does not indicate either error or connecting socket ready. Impossible condition (i.e. bug).  ret= %d: %s\n", err, sock_strerror(sock_errno));
            ret = -1;
            break;
        }
    } /* while error case for connect */
    Alarmp( SPLOG_DEBUG, SESSION, "connect_nointr_timeout: After connect, ret = %d error is:%s\n", ret, sock_strerror(sock_errno));

done_connect_try:
    if ( non_blocking )
    {
        /* set file descriptor to blocking */
        on = 0;
        ret_ioctl = ioctl( s, FIONBIO, &on);
        temp_time = E_sub_time(E_get_time(), start_time);
        *time_out = E_sub_time(*time_out, temp_time);
    }
    return(ret);
}

/* Increase socket buffer size to 200Kb if possible.
 * Used in SP_connect family when connection is established.
 */
static void set_large_socket_buffers(int s)
{
    int i, on, ret;
    sockopt_len_t onlen;

    for( i=10; i <= 200; i+=5 )
    {
        on = 1024*i;

        ret = setsockopt( s, SOL_SOCKET, SO_SNDBUF, (void *)&on, 4);
        if (ret < 0 ) break;
        
        ret = setsockopt( s, SOL_SOCKET, SO_RCVBUF, (void *)&on, 4);
        if (ret < 0 ) break;
	
        onlen = sizeof(on);
        ret= getsockopt( s, SOL_SOCKET, SO_SNDBUF, (void *)&on, &onlen );
        if( on < i*1024 ) break;
        Alarmp( SPLOG_INFO, SESSION, "set_large_socket_buffers: set sndbuf %d, ret is %d\n", on, ret );
        
        onlen = sizeof(on);
        ret= getsockopt( s, SOL_SOCKET, SO_RCVBUF, (void *)&on, &onlen );
        if( on < i*1024 ) break;
        Alarmp( SPLOG_INFO, SESSION, "set_large_socket_buffers: set rcvbuf %d, ret is %d\n", on, ret );
    }
    Alarmp( SPLOG_INFO, SESSION, "set_large_socket_buffers: set sndbuf/rcvbuf to %d\n", 1024*(i-5) );
}

/* API break 3.15.0. version is no longer a float. return 0 on error, 1 if set version number */
int	SP_version(int *major_version, int *minor_version, int *patch_version)
{
        if ( (major_version == NULL ) || 
             (minor_version == NULL ) ||
             (patch_version == NULL ) )
                return( 0 );

        *major_version = SP_MAJOR_VERSION;
        *minor_version = SP_MINOR_VERSION;
        *patch_version = SP_PATCH_VERSION;
        return( 1 );
}
/* Addition to API 3.16.0
 * Returns 0 on error, 1 if successful 
 * Registers a single authentication handler.
 */
int     SP_set_auth_method( const char *auth_name, int (*auth_function) (int, void *), void * auth_data)
{
        sp_initialize_locks();

        if (strlen(auth_name) >= MAX_AUTH_NAME)
        {
            Alarm( SESSION, "SP_set_auth_method: Name of auth method too long\n");
            return(0);
        }
        if ( NULL == auth_function )
        {
            Alarm( SESSION, "SP_set_auth_method: auth method is NULL\n");
            return(0);
        }
        Mutex_lock( &Struct_mutex );

        strncpy(Auth_Methods[0].name, auth_name, MAX_AUTH_NAME);
        Auth_Methods[0].authenticate = auth_function;
        Auth_Methods[0].auth_data = auth_data;
        Num_Reg_Auth_Methods = 1;

        Mutex_unlock( &Struct_mutex );
        return(1);
}
/* Addition to API 3.16.0
 * Returns 0 on error, 1 if successful
 * Registers the set of authentication handlers.
 */
int     SP_set_auth_methods( int num_methods, const char *auth_name[], int (*auth_function[]) (int, void *), void * auth_data[])
{
        int i;
        sp_initialize_locks();

        if (num_methods < 0 || num_methods > MAX_AUTH_METHODS)
        {
            Alarm( SESSION, "SP_set_auth_methods: Too many methods trying to be registered\n");
            return(0);
        }

        /* check validity of handlers */
        for (i=0; i< num_methods; i++)
        {
            if (strlen(auth_name[i]) >= MAX_AUTH_NAME)
            {
                Alarm( SESSION, "SP_set_auth_method: Name of auth method too long\n");
                return(0);
            }
            if ( NULL == auth_function[i] )
            {
                Alarm( SESSION, "SP_set_auth_method: auth method is NULL\n");
                return(0);
            }                
        }

        /* insert set of handlers as atomic action */
        Mutex_lock( &Struct_mutex );
        for (i=0; i< num_methods; i++)
        {

            strncpy(Auth_Methods[i].name, auth_name[i], MAX_AUTH_NAME);
            Auth_Methods[i].authenticate = auth_function[i];
            Auth_Methods[i].auth_data = auth_data[i];
        }
        Num_Reg_Auth_Methods = num_methods;
        Mutex_unlock( &Struct_mutex );
        return(1);
}

int	SP_connect( const char *spread_name, const char *private_name,
		    int priority, int group_membership, mailbox *mbox,
		    char *private_group )
{
        int ret;

        ret = SP_connect_timeout( spread_name, private_name, priority, group_membership, mbox, private_group, Zero_timeout);
        return(ret);
}

int	SP_connect_timeout( const char *spread_name, const char *private_name,
		    int priority, int group_membership, mailbox *mbox,
		    char *private_group, sp_time time_out )
{
	/* struct	hostent		*host_ptr, *gethostbyname(); */
	struct	hostent		*host_ptr;
	int16			port;
	int32			host_address;
	int32			lport, i1, i2, i3, i4;
	char			*c_ptr;
	char			host_name[80];
	char			s_name[80];
	char			dummy_s[80];
	char			conn[MAX_PRIVATE_NAME+5];
        signed char                    auth_list_len;
        char                    auth_list[MAX_AUTH_NAME * MAX_AUTH_METHODS];
        char                    auth_choice[MAX_AUTH_NAME * MAX_AUTH_METHODS];
        bool                    failed;
        int                     num_auth_methods;
        struct auth_method_info auth_methods[MAX_AUTH_METHODS];
	int			p;
	int			s;
	int			ret, i;
        unsigned int            len;
	int			sp_v1, sp_v2, sp_v3;
	char			l;
	int32			on;

	struct	sockaddr_in	inet_addr;

#ifndef	ARCH_PC_WIN95

	struct	sockaddr_un	unix_addr;

#endif	/* ARCH_PC_WIN95 */

#ifndef	ARCH_PC_WIN95

	signal( SIGPIPE, SIG_IGN );

#endif	/* ARCH_PC_WIN95 */

#ifdef ENABLEDEBUG
        Alarm_set_types(SESSION | DEBUG);
        Alarm_set_priority(SPLOG_DEBUG);
#endif

        sp_initialize_locks();

	/* 
	 * There are 3 possibilities for a name:
	 * 	3333
	 *	3333@commedia.cs.jhu.edu or 3333@fault
	 *	3333@128.220.221.1
	 */

	if( spread_name == 0 || (!strcmp( spread_name, "" ) ) ) 
#ifndef ARCH_PC_WIN95
		strcpy( s_name, "4803" );
#else
		strcpy( s_name, "4803@localhost" );
#endif	/* ARCH_PC_WIN95 */

	else strcpy( s_name, spread_name );
	c_ptr = strchr( s_name, ' ');
	if( c_ptr != 0 ) return ( ILLEGAL_SPREAD );
	c_ptr = strchr( s_name, '@');

	if( c_ptr == 0 )
	{

#ifndef	ARCH_PC_WIN95
		p = AF_UNIX;
		ret = sscanf( s_name, "%d%s", &lport, dummy_s );
		if( ret != 1 ) return( ILLEGAL_SPREAD );
#else	/* ARCH_PC_WIN95 */
		return( ILLEGAL_SPREAD );
#endif	/* ARCH_PC_WIN95 */

	}else{
		p = AF_INET;
		*c_ptr = ' ';
		ret = sscanf( s_name,"%d%s", &lport, host_name );
		if( ret != 2) return( ILLEGAL_SPREAD );
		host_ptr = gethostbyname( host_name );
		if( host_ptr != 0 )
		{
			/* option 3333@commedia.cs.jhu.edu */
			memcpy( &host_address, host_ptr->h_addr, sizeof(int32) );
		}else{
			/* option 3333@128.220.221.1 */
			for(i=0; i< 3; i++)
			{
				c_ptr = strchr(host_name, '.' );
				if ( c_ptr == 0) return( ILLEGAL_SPREAD );
				*c_ptr = ' ';
			}
			ret = sscanf( host_name, "%d%d%d%d%s", &i1, &i2, &i3, &i4, dummy_s );
			if( ret != 4 ) return( ILLEGAL_SPREAD );
			host_address = ( (i1 << 24 ) | (i2 << 16 ) | (i3 << 8) | i4 );
		}
	}
	if( lport < 0 || lport >= 32*1024 ) return( ILLEGAL_SPREAD );
	port = lport;
	if( p == AF_INET )
	{
		s = socket( AF_INET, SOCK_STREAM, 0 );
		if( s < 0 )
		{
			Alarm( DEBUG, "SP_connect: unable to create mailbox %d\n", s );
			return( COULD_NOT_CONNECT );
		}

                set_large_socket_buffers(s);

                on = 1;
                ret = setsockopt( s, IPPROTO_TCP, TCP_NODELAY, (void *)&on, 4);
                if (ret < 0) 
                        Alarm(PRINT, "Setting TCP_NODELAY failed with  error: %s\n", sock_strerror(sock_errno));
                else 
                        Alarm( SESSION, "SP_connect: set TCP_NODELAY for socket %d\n", s);

		inet_addr.sin_family = AF_INET;
		inet_addr.sin_port   = htons( port );
	        memcpy( &inet_addr.sin_addr, &host_address, sizeof(int32) );
		ret = connect_nointr_timeout( s, (struct sockaddr *)&inet_addr, sizeof(inet_addr), &time_out);
	}else{

#ifndef	ARCH_PC_WIN95
		s = socket( AF_UNIX, SOCK_STREAM, 0 );
		if( s < 0 )
		{
			Alarm( DEBUG, "SP_connect: unable to create mailbox %d\n", s );
			return( COULD_NOT_CONNECT );
		}

                set_large_socket_buffers(s);

		unix_addr.sun_family = AF_UNIX;
		sprintf( unix_addr.sun_path, "%s/spread.sock", _PATH_SPREAD_PIDDIR );
		ret = connect_nointr_timeout( s, (struct sockaddr *)&unix_addr, sizeof(unix_addr), &time_out);
#endif	/* !ARCH_PC_WIN95 */
	}

	if( ret < 0 )
	{
		Alarm( SESSION, "SP_connect: unable to connect mailbox %d: %s\n", s, sock_strerror(sock_errno));
		close( s );
		return( COULD_NOT_CONNECT );
	}
	/* 
	 * connect message looks like:
	 *
	 * byte - version of lib
	 * byte - subversion of lib
         * byte - patch version of lib
	 * byte - lower half byte 1/0 with or without groups, upper half byte: priority (0/1).
	 * byte - len of name
	 * len bytes - name
	 *
	 */
	conn[0] = SP_MAJOR_VERSION;
	conn[1] = SP_MINOR_VERSION;
	conn[2] = SP_PATCH_VERSION;

	if( group_membership ) conn[3] = 1; 
	else conn[3] = 0;
	if(priority < 0) priority = 0;
	if(priority > 1) priority = 1;
	conn[3] = conn[3]+16*priority;

        if (private_name == NULL)
        {
                len = 0;
        } else {
                len = strlen(private_name);
                if( len > MAX_PRIVATE_NAME ) len = MAX_PRIVATE_NAME;
                memcpy( &conn[5], private_name, len );
        }
	conn[4] = len;
	while(((ret = send( s, conn, len+5, 0 )) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                ;
	if( ret != len+5 )
	{
		Alarm( SESSION, "SP_connect: unable to send name %d %d: %s\n", ret, len+5, sock_strerror(sock_errno));
		close( s );
		return( CONNECTION_CLOSED );
	}
        /* Insert Access control and authentication checks here */
        auth_list_len = 0;
        ret = recv_nointr_timeout( s, &auth_list_len, 1, 0, &time_out); 
        if ( ret <= 0 )
        {
		Alarm( SESSION, "SP_connect: unable to read auth_list_len %d: %s\n", ret, sock_strerror(sock_errno));
		close( s );
		return( CONNECTION_CLOSED );
        }
        if ( auth_list_len > (MAX_AUTH_NAME * MAX_AUTH_METHODS) )
        {
		Alarm( SESSION, "SP_connect: illegal value in auth_list_len %d: %s\n", auth_list_len, sock_strerror(sock_errno));
		close( s );
		return( CONNECTION_CLOSED );
        } 
        if ( auth_list_len < 0 ) 
        {
            Alarm( SESSION, "SP_connect: connection invalid with code %d while reading auth_list_len\n", auth_list_len);
            close( s );
            return( auth_list_len );
        }
        if ( auth_list_len != 0 )
        {
                ret = recv_nointr_timeout( s, auth_list, auth_list_len, 0, &time_out);
                if ( ret <= 0 )
                {
                        Alarm( SESSION, "SP_connect: unable to read auth_list %d: %s\n", ret, sock_strerror(sock_errno));
                        close( s );
                        return( CONNECTION_CLOSED );
                }

                Alarm( SESSION, "SP_connect: DEBUG: Auth list is: %s\n", auth_list);
        } else {
                Alarm( SESSION, "SP_connect: DEBUG: Auth list is empty\n");
        }

        /* Here is where we check the list of available methods of authentication and pick one. 
         * For right now we just ignore the list and use the method the app set in SP_set_auth_method.
         * If no method was set we use the NULL method.
         * The global Auth_Methods struct needs to be protected by the Struct_mutex.
         */
        memset(auth_choice, 0, MAX_AUTH_NAME * MAX_AUTH_METHODS);
        Mutex_lock( &Struct_mutex );
        for (i=0; i< Num_Reg_Auth_Methods; i++)
        {
                auth_methods[i] = Auth_Methods[i];
                memcpy(&auth_choice[i * MAX_AUTH_NAME], Auth_Methods[i].name, MAX_AUTH_NAME);
        }
        num_auth_methods = Num_Reg_Auth_Methods;
        Mutex_unlock( &Struct_mutex );

        for (i=0; i < num_auth_methods; i++)
        {
                if ( !valid_auth_method(auth_methods[i].name, auth_list, auth_list_len) )
                {
                        Alarm( SESSION, "SP_connect: chosen authentication method is not permitted by daemon\n");
                        close( s );
                        return( REJECT_AUTH );
                }
        }
	while(((ret = send( s, auth_choice, MAX_AUTH_NAME * MAX_AUTH_METHODS, 0 )) == -1) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                ;
	if( ret != (MAX_AUTH_NAME * MAX_AUTH_METHODS) )
	{
		Alarm( SESSION, "SP_connect: unable to send auth_name %d %d: %s\n", ret, MAX_AUTH_NAME * MAX_AUTH_METHODS, sock_strerror(sock_errno));
		close( s );
		return( CONNECTION_CLOSED );
	}

        /* Here is where the authentication work will happen. 
         * This will be specific to the method chosen, and should be done in an
         * 'authenticate' function that is called and only returns when the process
         * is finished with either an authenticated connection or a failure. 
         * If failure (return 0) the SP_connect returns with an error REJECT_AUTH.
         * If authenticated (return 1 ), SP_connect continues with the rest of the connect protocol.
         */
        failed = FALSE;
        for (i = 0; i< num_auth_methods; i++)
        {
                ret = auth_methods[i].authenticate(s, auth_methods[i].auth_data);
                if (!ret)
                {
                        Alarm( SESSION, "SP_connect: authentication of connection failed in method %s\n", auth_methods[i].name);
                        failed = TRUE;
                }
        }
        if ( failed )
        {
                close ( s );
                return( REJECT_AUTH );
        }

	l=0;
        ret = recv_nointr_timeout( s, &l, 1, 0, &time_out);
	if( ret <= 0 )
	{
		Alarm( SESSION, "SP_connect: unable to read answer %d: %s\n", ret, sock_strerror(sock_errno));
		close( s );
		return( CONNECTION_CLOSED );
	}
	if( l != ACCEPT_SESSION )
	{
		Alarm( SESSION, "SP_connect: session rejected %d\n", l);
		close( s );
		return( l );
	}
        ret = recv_nointr_timeout( s, &l, 1, 0, &time_out);
	if( ret <= 0 )
	{
		Alarm( SESSION, "SP_connect: unable to read version %d: %s\n", ret, sock_strerror(sock_errno));
		close( s );
		return( CONNECTION_CLOSED );
	}
	sp_v1 = l;

        ret = recv_nointr_timeout( s, &l, 1, 0, &time_out);
	if( ret <= 0 )
	{
		Alarm( SESSION, "SP_connect: unable to read subversion %d: %s\n", ret, sock_strerror(sock_errno));
		close( s );
		return( CONNECTION_CLOSED );
	}
	sp_v2 = l;

        ret = recv_nointr_timeout( s, &l, 1, 0, &time_out);
	if( ret <= 0 )
	{
		Alarm( SESSION, "SP_connect: unable to read patch version %d: %s\n", ret, sock_strerror(sock_errno));
		close( s );
		return( CONNECTION_CLOSED );
	}
	sp_v3 = l;

	/* checking spread version. Should be at least 3.01 */
	if( sp_v1*10000 + sp_v2*100 + sp_v3 < 30100 )
	{
		Alarm( PRINT  , "SP_connect: old spread version %d.%d.%d not suppoted\n", 
			sp_v1, sp_v2, sp_v3 );
		close( s );
		return( REJECT_VERSION );
	}
	if( (sp_v1*10000 + sp_v2*100 + sp_v3 < 30800) && priority > 0 )
	{
		Alarm( PRINT, "SP_connect: old spread version %d.%d.%d does not support priority other than 0\n", 
			sp_v1, sp_v2, sp_v3 );
		close( s );
		return( REJECT_VERSION );
	}

        ret = recv_nointr_timeout( s, &l, 1, 0, &time_out);
	if( ret <= 0 )
	{
		Alarm( SESSION, "SP_connect: unable to read size of group %d: %s\n", ret, sock_strerror(sock_errno));
                close( s );
		return( CONNECTION_CLOSED );
	}
	len = l;
        ret = recv_nointr_timeout( s, private_group, len, 0, &time_out);
	if( ret <= 0 )
	{
		Alarm( SESSION, "SP_connect: unable to read private group %d:  %s\n", ret, sock_strerror(sock_errno));
		close( s );
		return( CONNECTION_CLOSED );
	}
	private_group[len]=0;
	Alarm( DEBUG, "SP_connect: connected with private group(%d bytes): %s\n", 
		ret, private_group );
	*mbox = s;

	Mutex_lock( &Struct_mutex );

	Sessions[Num_sessions].mbox = s;
	strcpy( Sessions[Num_sessions].private_group_name, private_group );
        Sessions[Num_sessions].recv_message_saved = 0;
	Num_sessions++;

	Mutex_unlock( &Struct_mutex );

	return( ACCEPT_SESSION );
}

int	SP_disconnect( mailbox mbox )
{
	int		ses;
	int		ret;
	char		send_group[MAX_GROUP_NAME];
	scatter		send_scat;

	Mutex_lock( &Struct_mutex );

	ses = SP_get_session( mbox );

	if( ses < 0 )
	{
		Mutex_unlock( &Struct_mutex );
		return( ILLEGAL_SESSION );
	}
	strcpy(send_group, Sessions[ses].private_group_name );

	Mutex_unlock( &Struct_mutex );

	send_scat.num_elements = 0;

	ret = SP_internal_multicast( mbox, KILL_MESS, 1, (const char (*)[MAX_GROUP_NAME])send_group, 0, &send_scat );

	SP_kill( mbox );

	ret = 0;

	return( ret );
}

int	SP_join( mailbox mbox, const char *group )
{
	int		ret;
	char		send_group[MAX_GROUP_NAME];
	scatter		send_scat;
	unsigned int	len;
	int		i;

	len = strlen( group );
	if ( len == 0 ) return( ILLEGAL_GROUP );
        if ( len >= MAX_GROUP_NAME ) return( ILLEGAL_GROUP );
	for( i=0; i < len; i++ )
		if( group[i] < 36 || group[i] > 126 ) return( ILLEGAL_GROUP );

	send_group[MAX_GROUP_NAME-1]=0;
	strncpy(send_group, group, MAX_GROUP_NAME-1);
	send_scat.num_elements = 0;

	ret = SP_internal_multicast( mbox, JOIN_MESS, 1, (const char (*)[MAX_GROUP_NAME])send_group, 0, &send_scat );
	return( ret );
}

int	SP_leave( mailbox mbox, const char *group )
{
	int		ret;
	char		send_group[MAX_GROUP_NAME];
	scatter		send_scat;
	unsigned int	len;
	int		i;

	len = strlen( group );
	if ( len == 0 ) return( ILLEGAL_GROUP );
	if ( len >= MAX_GROUP_NAME ) return( ILLEGAL_GROUP );
	for( i=0; i < len; i++ )
		if( group[i] < 36 || group[i] > 126 ) return( ILLEGAL_GROUP );

	send_group[MAX_GROUP_NAME-1]=0;
	strncpy(send_group, group, MAX_GROUP_NAME-1);
	send_scat.num_elements = 0;

	ret = SP_internal_multicast( mbox, LEAVE_MESS, 1, (const char (*)[MAX_GROUP_NAME])send_group, 0, &send_scat );
	return( ret );
}

int	SP_multicast( mailbox mbox, service service_type, 
		      const char *group,
		      int16 mess_type, int mess_len, const char *mess )
{
	int		ret;
	char		send_group[MAX_GROUP_NAME];
	scatter		send_scat;

	send_group[MAX_GROUP_NAME-1]=0;
	strncpy(send_group, group, MAX_GROUP_NAME-1);

	send_scat.num_elements = 1;
	send_scat.elements[0].len = mess_len;
	/* might be good to create a const_scatter type */
	send_scat.elements[0].buf = (char *)mess;

	ret = SP_multigroup_scat_multicast( mbox, service_type, 1, (const char (*)[MAX_GROUP_NAME])send_group, mess_type, &send_scat );
	return( ret );
}

int	SP_scat_multicast( mailbox mbox, service service_type, 
			   const char *group,
			   int16 mess_type, const scatter *scat_mess )
{
	int		ret;
	char		send_group[MAX_GROUP_NAME];

	send_group[MAX_GROUP_NAME-1]=0;
	strncpy(send_group, group, MAX_GROUP_NAME-1);

	ret = SP_multigroup_scat_multicast( mbox, service_type, 1, (const char (*)[MAX_GROUP_NAME])send_group, mess_type, scat_mess );
	return( ret );

}

int	SP_multigroup_multicast( mailbox mbox, service service_type, 
			   	 int num_groups,
				 const char groups[][MAX_GROUP_NAME],
		      		 int16 mess_type, int mess_len,
				 const char *mess )
{
	int		ret;
	scatter		send_scat;

	send_scat.num_elements = 1;
	send_scat.elements[0].len = mess_len;
	send_scat.elements[0].buf = (char *)mess;

	ret = SP_multigroup_scat_multicast( mbox, service_type, num_groups, groups, mess_type, &send_scat );
	return( ret );
}

int	SP_multigroup_scat_multicast( mailbox mbox, service service_type, 
			   	      int num_groups,
				      const char groups[][MAX_GROUP_NAME],
			   	      int16 mess_type,
				      const scatter *scat_mess )
{
	int		ret;

	if( !Is_regular_mess( service_type ) ) return( ILLEGAL_SERVICE );

	ret = SP_internal_multicast( mbox, service_type, num_groups, groups, mess_type, scat_mess );
	return( ret );
}

static	int	SP_internal_multicast( mailbox mbox, service service_type, 
				       int num_groups,
				       const char groups[][MAX_GROUP_NAME],
				       int16 mess_type,
				       const scatter *scat_mess )
{

	char		head_buf[10000]; 
	message_header	*head_ptr;
	char		*group_ptr;
	int		mess_len, len;
	int		ses;
	int		i;
	int		ret;

        /* zero head_buf to avoid information leakage */
        memset( head_buf, 0, sizeof(message_header) + MAX_GROUP_NAME*num_groups );

	Mutex_lock( &Struct_mutex );

	ses = SP_get_session( mbox );
	if( ses < 0 ){
		Mutex_unlock( &Struct_mutex );
		return( ILLEGAL_SESSION );
	}

	head_ptr = (message_header *)head_buf;
	group_ptr = &head_buf[ sizeof(message_header) ];

	/* enter the private_group_name of this mbox */
	strcpy( head_ptr->private_group_name, Sessions[ses].private_group_name );

	Mutex_unlock( &Struct_mutex );

	for( i=0, mess_len=0; i < scat_mess->num_elements; i++ )
	{
		if( scat_mess->elements[i].len < 0 ) return ( ILLEGAL_MESSAGE );
		mess_len += scat_mess->elements[i].len;
	}

        if ( (mess_len + num_groups * MAX_GROUP_NAME) > MAX_MESSAGE_BODY_LEN )
        {
                /* Message contents + groups is too large */
                return( MESSAGE_TOO_LONG );
        }

	head_ptr->type = service_type;
	head_ptr->type = Set_endian( head_ptr->type );

	head_ptr->hint = (mess_type << 8) & 0x00ffff00;
	head_ptr->hint = Set_endian( head_ptr->hint );

	head_ptr->num_groups = num_groups;
	head_ptr->data_len = mess_len;
	memcpy( group_ptr, groups, MAX_GROUP_NAME * num_groups );

	Mutex_lock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
	while(((ret=send( mbox, head_buf, sizeof(message_header)+MAX_GROUP_NAME*num_groups, 0 )) == -1) 
              && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                ;
	if( ret <=0 )
	{
		Alarm( SESSION, "SP_internal_multicast: error %d sending header and groups on mailbox %d: %s \n", ret, mbox, sock_strerror(sock_errno));
		Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
		SP_kill( mbox );
		return( CONNECTION_CLOSED );
	}
	for( len=0, i=0; i < scat_mess->num_elements; len+=ret, i++ )
	{
		while(((ret=send( mbox, scat_mess->elements[i].buf, scat_mess->elements[i].len, 0 )) == -1)
                      && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                        ;
		if( ret < 0 )
		{
			Alarm( SESSION, "SP_internal_multicast: error %d sending message data on mailbox %d: %s \n", ret, mbox, sock_strerror(sock_errno));
			Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
			SP_kill( mbox );
			return( CONNECTION_CLOSED );
		}
	}
	Mutex_unlock( &Mbox_mutex[mbox&MAX_MUTEX_MASK][0] );
	return( len );
}

int	SP_receive( mailbox mbox, service *service_type, char sender[MAX_GROUP_NAME],
		    int max_groups, int *num_groups, char groups[][MAX_GROUP_NAME],
		    int16 *mess_type, int *endian_mismatch,
		    int max_mess_len, char *mess )
{
	int		ret;
	scatter		recv_scat;

	recv_scat.num_elements = 1;
	recv_scat.elements[0].len = max_mess_len;
	recv_scat.elements[0].buf = mess;

	ret = SP_scat_receive( mbox, service_type, sender, max_groups, num_groups, groups, 
				mess_type, endian_mismatch, &recv_scat );
	return( ret );
}

int	SP_scat_receive( mailbox mbox, service *service_type, char sender[MAX_GROUP_NAME],
			 int max_groups, int *num_groups, char groups[][MAX_GROUP_NAME],
			 int16 *mess_type, int *endian_mismatch,
			 scatter *scat_mess )
{

static	char		dummy_buf[10240];
        int             This_session_message_saved;
        int             drop_semantics;
	message_header	mess_head;
	message_header	*head_ptr;
	char		*buf_ptr;
        int32           temp_mess_type;
	int		len, remain, ret;
	int		max_mess_len;
	int		short_buffer;
	int		short_groups;
	int		to_read;
	int		scat_index, byte_index;
	int		ses;
	char		This_session_private_group[MAX_GROUP_NAME];
	int		i;
        int32           old_type;

        /* I must acquire the lock for this mbox before the Struct_mutex lock because
         * I must be sure ONLY one thread is in recv for this mbox, EVEN for 
         * this initial 'get the session and session state' operation.
         * Otherwise one thread enters this and gets the state and sees no saved message
         * then grabs the mbox mutex and discoveres buffer too short and so regrabs the
         * Struct_mutex and adds the saved header, but during this time another thread
         * has entered recv for the same mbox and already grabbed the struct_mutex and also
         * read that no saved mesage exists and is now waiting for the mbox mutex.
         * When it the first thread returns and releases the mbox mutex, the second thread will
         * grab it and enter--but it will think there is NO saved messaage when in reality
         * there IS one. This will cause MANY PROBLEMS :-)
         *
         * NOTE: locking and unlocking the Struct_mutex multiple times during this is OK
         * BECAUSE struct_Mutex only locks non-blocking operations that are guaranteed to complete
         * quickly and never take additional locks.
         */
	Mutex_lock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );

	Mutex_lock( &Struct_mutex );
	/* verify mbox */
	ses = SP_get_session( mbox );
	if( ses < 0 ){
		Mutex_unlock( &Struct_mutex );
                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
		return( ILLEGAL_SESSION );
	}
	strcpy( This_session_private_group, Sessions[ses].private_group_name );

        if (Sessions[ses].recv_message_saved) {
                memcpy(&mess_head, &(Sessions[ses].recv_saved_head), sizeof(message_header) );
                This_session_message_saved = 1;
        } else {
                This_session_message_saved = 0;
        }

	Mutex_unlock( &Struct_mutex );
        
	head_ptr = (message_header *)&mess_head;
	buf_ptr = (char *)&mess_head;

        drop_semantics = Is_drop_recv(*service_type);

        if (!This_session_message_saved) {
                /* read up to size of message_header */
                for( len=0, remain = sizeof(message_header); remain > 0;  len += ret, remain -= ret )
                {
                        while(((ret = recv( mbox, &buf_ptr[len], remain, 0 )) == -1 )
                              && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                                ;
                        if( ret <=0 )
                        {
                                Alarm( SESSION, "SP_scat_receive: failed receiving header on session %d (ret: %d len: %d): %s\n", mbox, ret, len, sock_strerror(sock_errno) );
                                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );

                                SP_kill( mbox );
                                return( CONNECTION_CLOSED );
                        }
                }

                /* Fliping message header to my form if needed */
                if( !Same_endian( head_ptr->type ) ) 
                {
                        Flip_mess( head_ptr );
                }
        }
        /* Validate user's scatter */
	for( max_mess_len = 0, i=0; i < scat_mess->num_elements; i++ ) {
                if ( scat_mess->elements[i].len < 0 )   {
                        if ( !drop_semantics && !This_session_message_saved) {
                                Mutex_lock( &Struct_mutex );
                                ses = SP_get_session( mbox );
                                if( ses < 0 ){
                                        Mutex_unlock( &Struct_mutex );
                                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                                        return( ILLEGAL_SESSION );
                                }
                                memcpy(&(Sessions[ses].recv_saved_head), &mess_head, sizeof(message_header) );
                                Sessions[ses].recv_message_saved = 1;
                                Mutex_unlock( &Struct_mutex );
                        }
                        return( ILLEGAL_MESSAGE );
                }
		max_mess_len += scat_mess->elements[i].len;
        }
        /* Validate num_groups and data_len */
        if (head_ptr->num_groups < 0) {
            /* reject this message since it has an impossible (negative) num_groups
             * This is likely to be caused by a malicious attack or memory corruption
             */
            return( ILLEGAL_MESSAGE );
        }
        if (head_ptr->data_len < 0) {
            /* reject this message since it has an impossible (negative) data_len
             * This is likely to be caused by a malicious attack or memory corruption
             */
            return( ILLEGAL_MESSAGE );
        }

        /* Check if sufficient buffer space for groups and data */
        if (!drop_semantics) {
                if ( (head_ptr->num_groups > max_groups) || (head_ptr->data_len > max_mess_len) ) {
                        if (!This_session_message_saved) {
                                Mutex_lock( &Struct_mutex );
                                ses = SP_get_session( mbox );
                                if( ses < 0 ){
                                        Mutex_unlock( &Struct_mutex );
                                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                                        return( ILLEGAL_SESSION );
                                }
                                memcpy(&(Sessions[ses].recv_saved_head), &mess_head, sizeof(message_header) );
                                Sessions[ses].recv_message_saved = 1;
                                Mutex_unlock( &Struct_mutex );
                        }
                        /* When *_TOO_SHORT error will be returned, provide caller with all available information:
                         * service_type
                         * sender
                         * mess_type
                         * 
                         * The num_groups field and endian_mismatch field are used to specify the required
                         * size of the groups array and message body array in order to fit the current message
                         * so, they do NOT have their usual meaning. 
                         * If number of groups in the message is > max_groups then the number of required groups 
                         *   is returned as a negative value in the num_groups field.
                         * If the size of the message is > max_mess_len, then the required size in bytes is 
                         *   returned as a negative value in the endian_mismatch field.
                         */
                        if ( Is_regular_mess( head_ptr->type ) || Is_reject_mess( head_ptr->type ) )
                        {
                                temp_mess_type = head_ptr->hint;
                                if ( !Same_endian( head_ptr->hint ) ) {
                                        temp_mess_type = Flip_int32( temp_mess_type );
                                }
                                temp_mess_type = Clear_endian( temp_mess_type );
                                *mess_type = ( temp_mess_type >> 8 ) & 0x0000ffff;
                        }
                        else 
                                *mess_type = 0;
                        *service_type = Clear_endian( head_ptr->type );
                        if (head_ptr->num_groups > max_groups)
                                *num_groups = -(head_ptr->num_groups);
                        else    
                                *num_groups = 0;
                        if (head_ptr->data_len > max_mess_len)
                                *endian_mismatch = -(head_ptr->data_len);
                        else
                                *endian_mismatch = 0;

                        /* Return sender field to caller */
                        strncpy( sender, head_ptr->private_group_name, MAX_GROUP_NAME );

                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                        if (*num_groups)
                                return( GROUPS_TOO_SHORT );
                        else
                                return( BUFFER_TOO_SHORT );
                }
        }
	/* Compute mess_type and endian_mismatch from hint */
	if( Is_regular_mess( head_ptr->type ) || Is_reject_mess( head_ptr->type)  )
	{
		if( !Same_endian( head_ptr->hint ) )
		{
			head_ptr->hint = Flip_int32( head_ptr->hint );
			*endian_mismatch = 1;
		}else{
			*endian_mismatch = 0;
		}
                head_ptr->hint = Clear_endian( head_ptr->hint );
                head_ptr->hint = ( head_ptr->hint >> 8 ) & 0x0000ffff;
                *mess_type = head_ptr->hint;
	}else{
		*mess_type = -1; /* marks the index (0..n-1) of the member in the group */
		*endian_mismatch = 0;
	}

	strncpy( sender, head_ptr->private_group_name, MAX_GROUP_NAME );
        
        /* if a reject message read the extra old_type field first, and merge with head_ptr->type */
        if ( Is_reject_mess( head_ptr->type ) )
        {
                remain = 4;
                buf_ptr = (char *)&old_type;
                for( len=0; remain > 0; len += ret, remain -= ret )
                {
                        while(((ret = recv( mbox, &buf_ptr[len], remain, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                                ;
                        if( ret <=0 )
                        {
                                Alarm( SESSION, "SP_scat_receive: failed receiving old_type for reject on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
                                Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                                SP_kill( mbox );
                                return( CONNECTION_CLOSED );
                        }
                }
                /* endian flip it */
                if ( !Same_endian( head_ptr->type ) )
                        old_type = Flip_int32(old_type);
        }

	/* read the destination groups */
	buf_ptr = (char *)groups;

	remain = head_ptr->num_groups * MAX_GROUP_NAME;
	short_groups=0;
	if( head_ptr->num_groups > max_groups )
	{
		/* groups too short */
		remain = max_groups * MAX_GROUP_NAME;
		short_groups = 1;
	}

	for( len=0; remain > 0; len += ret, remain -= ret )
	{
		while(((ret = recv( mbox, &buf_ptr[len], remain, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                        ;
		if( ret <=0 )
		{
			Alarm( SESSION, "SP_scat_receive: failed receiving groups on session %d, ret is %d: %s\n", mbox, ret, sock_strerror(sock_errno));
			Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );

			SP_kill( mbox );
			return( CONNECTION_CLOSED );
		}
	}

	if( short_groups )
	{
		for( remain = (head_ptr->num_groups - max_groups) * MAX_GROUP_NAME; 
		     remain > 0; remain -= ret ) 
		{
			to_read = remain;
			if( to_read > sizeof( dummy_buf ) ) to_read = sizeof( dummy_buf );
			while(((ret = recv( mbox, dummy_buf, to_read, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                                ;
			if( ret <=0 )
			{
				Alarm( SESSION, "SP_scat_receive: failed receiving groups overflow on session %d, ret is %d: %s\n", 
                                       mbox, ret, sock_strerror(sock_errno) );
				Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
				SP_kill( mbox );
				return( CONNECTION_CLOSED );
			}
		}
		*num_groups = -head_ptr->num_groups; /* !!!! */
	}else	*num_groups = head_ptr->num_groups;

	/* read the rest of the message */
	remain = head_ptr->data_len;
	short_buffer=0;
	if( head_ptr->data_len > max_mess_len )
	{
		/* buffer too short */
		remain = max_mess_len;
		short_buffer = 1;
	}

	ret = 0;
	/* 
	 * pay attention that if head_ptr->data_len is smaller than max_mess_len we need to
	 * change scat, do recvmsg, and restore scat, and then check ret.
         * ret = recvmsg( mbox, &msg, 0 ); 
         * if( ret <=0 )
         * {
         *      Alarm( SESSION, "SP_scat_receive: failed receiving message on session %d\n", mbox );
	 *
	 *	Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
	 *
         *      SP_kill( mbox );
         *      return;
         * }
	 */

	/* calculate scat_index and byte_index based on ret and scat_mess */
	for( byte_index=ret, scat_index=0; scat_index < scat_mess->num_elements; scat_index++ )
	{
		if( scat_mess->elements[scat_index].len > byte_index ) break;
		byte_index -= scat_mess->elements[scat_index].len;
	}

	remain -= ret;
	for( len=ret; remain > 0; len += ret, remain -= ret )
	{
		to_read = scat_mess->elements[scat_index].len - byte_index;
		if( to_read > remain ) to_read = remain;
		while(((ret = recv( mbox, &scat_mess->elements[scat_index].buf[byte_index], to_read, 0 )) == -1 )
                      && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                        ;
		if( ret <=0 )
		{
			Alarm( SESSION, "SP_scat_receive: failed receiving message on session %d, ret is %d: %s\n", 
                               mbox, ret, sock_strerror(sock_errno) );
			Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
			SP_kill( mbox );
			return( CONNECTION_CLOSED );
		}else if( ret == to_read ){
			byte_index = 0;
			scat_index++;
		}else{
			byte_index += ret;
		}
	}

	if( Is_reg_memb_mess( head_ptr->type ) && !short_groups )
	{
		/* calculate my index in group */
		for( i=0; i < head_ptr->num_groups; i++ )
		{
			if( !strcmp( groups[i], This_session_private_group ) )
			{
				*mess_type = i;
				break;
			}
		}
	}

 	if( Is_reg_memb_mess( head_ptr->type ) && !Same_endian( head_ptr->type ) )
	{
		int	 	flip_size;
		group_id	*gid_ptr;
		int32		*num_vs_ptr;
		int		bytes_to_copy, bytes_index;
		char		groups_buf[10240];

		/* 
		 * flip membership message:
		 * group_id and number of member ins vs_set 
		 * so - acctually 4 int32.
		 */
		flip_size = sizeof( group_id ) + sizeof( int32 );
		if( flip_size > max_mess_len ) flip_size = max_mess_len;
		for( bytes_index=0, i=0 ; bytes_index < flip_size ; i++, bytes_index += bytes_to_copy )
		{
			bytes_to_copy = flip_size - bytes_index;
			if( bytes_to_copy > scat_mess->elements[i].len )
				bytes_to_copy = scat_mess->elements[i].len;
			memcpy( &groups_buf[bytes_index], scat_mess->elements[i].buf, bytes_to_copy );
		}
		gid_ptr    = (group_id *)&groups_buf[0];
		num_vs_ptr = (int32 *)&groups_buf[sizeof(group_id)];
		gid_ptr->memb_id.proc_id = Flip_int32( gid_ptr->memb_id.proc_id );
		gid_ptr->memb_id.time    = Flip_int32( gid_ptr->memb_id.time );
		gid_ptr->index           = Flip_int32( gid_ptr->index );
		*num_vs_ptr 	         = Flip_int32( *num_vs_ptr );
		for( bytes_index=0, i=0 ; bytes_index < flip_size ; i++, bytes_index += bytes_to_copy )
		{
			bytes_to_copy = flip_size - bytes_index;
			if( bytes_to_copy > scat_mess->elements[i].len )
				bytes_to_copy = scat_mess->elements[i].len;
			memcpy( scat_mess->elements[i].buf, &groups_buf[bytes_index], bytes_to_copy );
		}

	}
        if ( Is_reject_mess( head_ptr->type ) )
        {
                /* set type to be old type + reject */
                head_ptr->type = old_type | REJECT_MESS;
        }
	*service_type = Clear_endian( head_ptr->type );

	if( short_buffer )
	{
		for( remain = head_ptr->data_len - max_mess_len; remain > 0; remain -= ret ) 
		{
			to_read = remain;
			if( to_read > sizeof( dummy_buf ) ) to_read = sizeof( dummy_buf );
			while(((ret = recv( mbox, dummy_buf, to_read, 0 )) == -1 ) && ((sock_errno == EINTR) || (sock_errno == EAGAIN) || (sock_errno == EWOULDBLOCK)) )
                                ;
			if( ret <=0 )
			{
				Alarm( SESSION, "SP_scat_receive: failed receiving overflow on session %d, ret is %d: %s\n", 
                                       mbox, ret, sock_strerror(sock_errno) );
				Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
				SP_kill( mbox );
				return( CONNECTION_CLOSED );
			}
		}
		Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
		return( BUFFER_TOO_SHORT );
	}
        /* Successful receive so clear saved_message info if any */
        if (This_session_message_saved) {
                Mutex_lock( &Struct_mutex );
                ses = SP_get_session( mbox );
                if( ses < 0 ){
                        Mutex_unlock( &Struct_mutex );
                        Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
                        return( ILLEGAL_SESSION );
                }
                memset(&(Sessions[ses].recv_saved_head), 0, sizeof(message_header) );
                Sessions[ses].recv_message_saved = 0;
                Mutex_unlock( &Struct_mutex );
        }

	Mutex_unlock( &Mbox_mutex[mbox & MAX_MUTEX_MASK][1] );
	return( head_ptr->data_len );
}

int	SP_poll( mailbox mbox )
{
	int		num_bytes;
	int		ses;
	int		ret;

	Mutex_lock( &Struct_mutex );

	/* verify mbox */
	ses = SP_get_session( mbox );

	Mutex_unlock( &Struct_mutex );

	if( ses < 0 ) return( ILLEGAL_SESSION );

	ret = ioctl( mbox, FIONREAD, &num_bytes);
	if( ret < 0 ) return( ILLEGAL_SESSION );
	return( num_bytes );

}

int     SP_equal_group_ids( group_id g1, group_id g2 )
{
	if( g1.memb_id.proc_id == g2.memb_id.proc_id && g1.memb_id.time == g2.memb_id.time && g1.index == g2.index ) return( 1 );
	else return( 0 );
}

int SP_get_gid_offset_memb_mess() 
{
    return 0;
}

int SP_get_num_vs_offset_memb_mess() 
{
    return sizeof(group_id);
}

int SP_get_vs_set_offset_memb_mess() 
{
    return sizeof(group_id) + sizeof(int32);
}

int	SP_query_groups( mailbox mbox, int max_groups, char *groups[MAX_GROUP_NAME] )
{
	return( -1 );
}

int	SP_query_members( mailbox mbox, char *group, int max_members, char *members[MAX_GROUP_NAME] )
{
	return( -1 );
}

static  void	SP_kill( mailbox mbox )
{
	int	ses;
	int	i;

	Mutex_lock( &Struct_mutex );

	/* get mbox out of the data structures */
	ses = SP_get_session( mbox );
	if( ses < 0 ){ 
		Alarm( SESSION, "SP_kill: killing non existent session for mailbox %d (might be ok in a threaded case)\n",mbox );
		Mutex_unlock( &Struct_mutex );
		return;
	}

	close(mbox);
	for( i=ses+1; i < Num_sessions; i++ )
		memcpy( &Sessions[i-1], &Sessions[i], sizeof(sp_session) );
	Num_sessions--;

	Mutex_unlock( &Struct_mutex );
}

static	int	SP_get_session( mailbox mbox )
{
	int ses;

	for( ses=0; ses < Num_sessions; ses++ )
	{
		if( Sessions[ses].mbox == mbox ) return( ses );
	}
	return( -1 );
}
void	SP_error( int error )
{
	switch( error )
	{
		case ILLEGAL_SPREAD:
			Alarm( PRINT, "SP_error: (%d) Illegal spread was provided\n", error );
			break;
		case COULD_NOT_CONNECT:
			Alarm( PRINT, "SP_error: (%d) Could not connect. Is Spread running?\n", error );
			break;
		case REJECT_QUOTA:
			Alarm( PRINT, "SP_error: (%d) Connection rejected, to many users\n", error );
			break;
		case REJECT_NO_NAME:
			Alarm( PRINT, "SP_error: (%d) Connection rejected, no name was supplied\n", error );
			break;
		case REJECT_ILLEGAL_NAME:
			Alarm( PRINT, "SP_error: (%d) Connection rejected, illegal name\n", error );
			break;
		case REJECT_NOT_UNIQUE:
			Alarm( PRINT, "SP_error: (%d) Connection rejected, name not unique\n", error );
			break;
		case REJECT_VERSION:
			Alarm( PRINT, "SP_error: (%d) Connection rejected, library does not fit daemon\n", error );
			break;
		case CONNECTION_CLOSED:
			Alarm( PRINT, "SP_error: (%d) Connection closed by spread\n", error );
			break;
		case REJECT_AUTH:
			Alarm( PRINT, "SP_error: (%d) Connection rejected, authentication failed\n", error );
			break;
		case ILLEGAL_SESSION:
			Alarm( PRINT, "SP_error: (%d) Illegal session was supplied\n", error );
			break;
		case ILLEGAL_SERVICE:
			Alarm( PRINT, "SP_error: (%d) Illegal service request\n", error );
			break;
		case ILLEGAL_MESSAGE:
			Alarm( PRINT, "SP_error: (%d) Illegal message\n", error );
			break;
		case ILLEGAL_GROUP:
			Alarm( PRINT, "SP_error: (%d) Illegal group\n", error );
			break;
		case BUFFER_TOO_SHORT:
			Alarm( PRINT, "SP_error: (%d) The supplied buffer was too short\n", error );
			break;
		case GROUPS_TOO_SHORT:
			Alarm( PRINT, "SP_error: (%d) The supplied groups list was too short\n", error );
			break;
		case MESSAGE_TOO_LONG:
			Alarm( PRINT, "SP_error: (%d) The message body + group names was too large to fit in a message\n", error );
			break;
		default:
			Alarm( PRINT, "SP_error: (%d) unrecognized error\n", error );
	}
}

static	void    Flip_mess( message_header *head_ptr )
{
	head_ptr->type     	= Flip_int32( head_ptr->type );
	head_ptr->num_groups	= Flip_int32( head_ptr->num_groups );
	head_ptr->data_len 	= Flip_int32( head_ptr->data_len );
}


syntax highlighted by Code2HTML, v. 0.9.1