/*===========================================================================* * * * smttran.c - * * * * Copyright (c) 1991-2003 iMatix Corporation * * * * ------------------ GPL Licensed Source Code ------------------ * * iMatix makes this software available under the GNU General * * Public License (GPL) license for open source projects. For * * details of the GPL license please see www.gnu.org or read the * * file license.gpl provided in this package. * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License as * * published by the Free Software Foundation; either version 2 of * * the License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public * * License along with this program in the file 'license.gpl'; if * * not, write to the Free Software Foundation, Inc., 59 Temple * * Place - Suite 330, Boston, MA 02111-1307, USA. * * * * You can also license this software under iMatix's General Terms * * of Business (GTB) for commercial projects. If you have not * * explicitly licensed this software under the iMatix GTB you may * * only use it under the terms of the GNU General Public License. * * * * For more information, send an email to info@imatix.com. * * -------------------------------------------------------------- * *===========================================================================*/ /* Synopsis: Transfers blocks or files via an opened TCP/IP socket. */ #include "smtdefn.h" /* SMT definitions */ /*- Definitions -------------------------------------------------------------*/ #define AGENT_NAME SMT_TRANSFER /* Our public name */ #define BUFFER_SIZE 32768L /* Buffer for file transmission */ #define FILETYPE_BINARY 0 #define FILETYPE_ASCII 1 typedef struct { /* Pipe descriptor */ long input_rate; /* Max input transfer rate */ long output_rate; /* Max output transfer rate */ long input_credit; /* Current input credit */ long output_credit; /* Current output credit */ } PIPE; typedef struct { /* Thread context block: */ event_t thread_type; /* Thread type indicator */ QID reply_to; /* Queue to reply to */ sock_t handle; /* Socket i/o handle */ char *filename; /* Filename, if used */ qbyte filesize; /* Total file size */ int stream; /* File i/o handle, if used */ qbyte read_size; /* Read size, 0 = GET */ qbyte write_size; /* Write size, 0 = GET */ qbyte block_size; /* Block size, 0 = GET */ char *block_data; /* Block data, or NULL */ char *block_text; /* Block text data, or NULL */ long start_slice; /* Start of slice, if any */ long end_slice; /* End slice, if any */ dbyte filetype; /* Type of file (binary, ascii) */ Bool send_text; /* Do we send block data or text */ Bool append; /* Open for output or append? */ qbyte maxsize; /* Max. accepted file size */ PIPE *pipe; /* Transfer pipe, if any */ } TCB; typedef struct _TRANREQ { /* Request descriptor */ struct _TRANREQ /* */ *next, *prev; /* Doubly-linked list */ QID reply_to; /* Who sent the request event */ Bool sending; /* True=PUT, false=GET */ sock_t handle; /* Socket for output */ qbyte block_size; /* Block size, 0 = GET */ char *block_data; /* Block data, or NULL */ Bool in_header; /* True when doing header */ Bool in_progress; /* Are we processing request? */ Bool huge_block; /* Are we using huge blocks? */ qbyte reqid; /* Unique request ID */ PIPE *pipe; /* Transfer pipe, if any */ } TRANREQ; /*- Local function prototypes -----------------------------------------------*/ static TRANREQ *request_create (THREAD *thread, sock_t handle, char **pipe_name); static TRANREQ *request_lookup (qbyte reqid); static void request_destroy (TRANREQ *request); static void signal_fatal_error (THREAD *thread); static void put_request_header (THREAD *thread, TRANREQ *request); static void put_request_body (THREAD *thread, TRANREQ *request); static void get_request_header (THREAD *thread, TRANREQ *request); static void get_request_body (THREAD *thread, TRANREQ *request); static void open_transfer_file (THREAD *thread, char mode); /*- Global variables used in this source file only --------------------------*/ static TCB *tcb; /* Address thread context block */ static QID operq, /* Operator console event queue */ timeq, /* Timer agent event queue */ sockq; /* Socket agent event queue */ static NODE requests; /* Request list header */ static int fileio_threads = 0; /* Number of child fileio threads */ static SYMTAB *pipes = NULL; /* Pipe look-up table */ #include "smttran.d" /* Include dialog data */ /******************** INITIALISE AGENT - ENTRY POINT *********************/ /* ---------------------------------------------------------------------[<]- Function: smttran_init Synopsis: Initialises the SMT transfer agent. Returns 0 if initialised okay, -1 if there was an error. The transfer agent reads and writes blocks of data, or complete files, through an open TCP/IP socket. A block of data is sent/received as a two-byte length header followed by the data block. Creates an unnamed thread automatically. Supports these public methods: PUT_BLOCK Write a length-specified block to a socket (< 64k) GET_BLOCK Read a length-specified block from a socket (< 64k) PUT_HUGE Write a length-specified block to a socket (< 2Gb) GET_HUGE Read a length-specified block from a socket (< 2Gb) PUT_FILE Write part or all of a file to a socket GET_FILE Read part or all of a file from a socket PIPE_CREATE Create new transfer pipe CLEAR_PIPES Destroy all transfer pipes COMMIT Wait until all transfer requests are finished
Sends errors to the SMTOPER agent; see the doc for reply events. ---------------------------------------------------------------------[>]-*/ int smttran_init (void) { AGENT *agent; /* Handle for our agent */ THREAD *thread; /* Handle to various threads */ # include "smttran.i" /* Include dialog interpreter */ /* Method name Event value Priority */ /* Shutdown event comes from Kernel */ method_declare (agent, "SHUTDOWN", shutdown_event, SMT_PRIORITY_MAX); /* Reply events from socket agent */ method_declare (agent, "SOCK_INPUT_OK", ok_event, 0); method_declare (agent, "SOCK_OUTPUT_OK", ok_event, 0); method_declare (agent, "SOCK_READ_OK", read_ok_event, 0); method_declare (agent, "SOCK_WRITE_OK", write_ok_event, 0); method_declare (agent, "SOCK_READH_OK", readh_ok_event, 0); method_declare (agent, "SOCK_WRITEH_OK", writeh_ok_event, 0); method_declare (agent, "SOCK_CLOSED", sock_closed_event, 0); method_declare (agent, "SOCK_ERROR", sock_error_event, 0); method_declare (agent, "SOCK_TIMEOUT", sock_error_event, 0); /* Reply events from timer agent */ method_declare (agent, "TIME_ALARM", alarm_event, 0); method_declare (agent, "TIME_ERROR", error_event, 0); /* Public methods supported by this agent */ declare_put_block (put_block_event, 0); declare_get_block (get_block_event, 0); declare_put_huge (put_huge_event, 0); declare_get_huge (get_huge_event, 0); declare_put_file (put_file_event, 0); declare_get_file (get_file_event, 0); declare_pipe_create (pipe_create_event, 0); declare_clear_pipes (clear_pipes_event, 0); declare_tran_commit (commit_event, SMT_PRIORITY_MIN); /* Private methods used to pass initial thread events */ method_declare (agent, "_MASTER", master_event, 0); method_declare (agent, "_PIPE_MANAGER", pipe_manager_event, 0); /* Ensure that operator console is running, else start it up */ smtoper_init (); if ((thread = thread_lookup (SMT_OPERATOR, "")) != NULL) operq = thread-> queue-> qid; else return (-1); /* Ensure that socket agent is running, else start it up */ smtsock_init (); if ((thread = thread_lookup (SMT_SOCKET, "")) != NULL) sockq = thread-> queue-> qid; else return (-1); /* Ensure that timer agent is running, else start it up */ smttime_init (); if ((thread = thread_lookup (SMT_TIMER, "")) != NULL) timeq = thread-> queue-> qid; else return (-1); /* Create initial thread to manage master thread */ if ((thread = thread_create (AGENT_NAME, "")) != NULL) { SEND (&thread-> queue-> qid, "_MASTER", ""); ((TCB *) thread-> tcb)-> thread_type = master_event; ((TCB *) thread-> tcb)-> filename = NULL; } else return (-1); /* Create initial thread to manage pipes */ if ((thread = thread_create (AGENT_NAME, "__pipe_manager")) != NULL) { SEND (&thread-> queue-> qid, "_PIPE_MANAGER", ""); ((TCB *) thread-> tcb)-> thread_type = pipe_manager_event; ((TCB *) thread-> tcb)-> filename = NULL; } else return (-1); /* Clear & prepare request queue */ node_reset (&requests); /* Create pipe lookup table */ pipes = sym_create_table (); /* Signal okay to caller that we initialised okay */ return (0); } /************************* INITIALISE THE THREAD *************************/ MODULE initialise_the_thread (THREAD *thread) { /* We don't set the_next_event because we expect an argument event */ /* to supply these. */ tcb = thread-> tcb; /* Point to thread's context */ tcb-> stream = -1; tcb-> block_data = NULL; tcb-> block_text = NULL; tcb-> send_text = FALSE; } /*********************** GET NEXT EVENT FROM QUEUE ***********************/ MODULE get_next_event_from_queue (THREAD *thread) { AGENT *agent; /* This agent */ QUEUE *queue; /* Thread's event queue */ EVENT *event; /* Event information block */ METHOD *method; /* Method information block */ tcb = thread-> tcb; /* Point to thread's context */ /* Get next event off queue */ queue = thread-> queue; agent = queue-> agent; event = event_iterate (queue, NULL); if (event) { method = method_lookup (agent, event-> name); if (method == NULL) /* Not a method we accept */ event_reject (queue, event); else { if (thread-> event) /* If thread was sitting on an */ { /* event, release it */ event_destroy (thread-> event); thread-> event = NULL; } /* Get event off queue; it now belongs to the thread */ thread-> event = event_accept (queue, event); the_next_event = method-> event_number; } } else the_next_event = empty_event; } /************************** CREATE TRANSFER PIPE *************************/ MODULE create_transfer_pipe (THREAD *thread) { SYMBOL *pipe_sym; PIPE *pipe; struct_smt_pipe_create *args; tcb = thread-> tcb; /* Point to thread's context */ if (pipes) { get_smt_pipe_create (thread-> event-> body, &args); pipe_sym = sym_lookup_symbol (pipes, args-> name); if (pipe_sym) pipe = (PIPE *) pipe_sym-> value; else { pipe_sym = sym_create_symbol (pipes, args-> name, NULL); pipe = mem_alloc (sizeof (PIPE)); pipe_sym-> value = (char *) pipe; } pipe-> input_rate = args-> input_rate; pipe-> output_rate = args-> output_rate; pipe-> input_credit = 0; pipe-> output_credit = 0; free_smt_pipe_create (&args); } } /************************* WAIT FOR PIPE REFRESH *************************/ MODULE wait_for_pipe_refresh (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ /* Ask timer to send us an event after one second */ send_alarm (&timeq, 0, 100, 0, NULL); } /*************************** REFRESH ALL PIPES ***************************/ MODULE refresh_all_pipes (THREAD *thread) { SYMBOL *pipe_sym; PIPE *pipe; tcb = thread-> tcb; /* Point to thread's context */ for (pipe_sym = pipes-> symbols; pipe_sym; pipe_sym = pipe_sym-> next) { pipe = (PIPE *) pipe_sym-> value; /* Credits can be negative, but never greater than the rate */ pipe-> input_credit += pipe-> input_rate; if (pipe-> input_credit > pipe-> input_rate) pipe-> input_credit = pipe-> input_rate; pipe-> output_credit += pipe-> output_rate; if (pipe-> output_credit > pipe-> output_rate) pipe-> output_credit = pipe-> output_rate; } } /*********************** DESTROY ALL TRANSFER PIPES **********************/ MODULE destroy_all_transfer_pipes (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ sym_delete_table (pipes); pipes = NULL; /* Pipes no longer available */ } /************************ CREATE PUT BLOCK REQUEST ***********************/ MODULE create_put_block_request (THREAD *thread) { dbyte block_size; /* Size of data block to write */ char *block_data = NULL, /* Block of data to write */ *pipe_name = NULL; /* Pipe for i/o */ sock_t handle; /* Socket for output */ TRANREQ *request; /* Newly-created request */ tcb = thread-> tcb; /* Point to thread's context */ /* Get arguments from message */ exdr_read (thread-> event-> body, SMT_TRAN_PUTB, &handle, &block_size, &block_data, &pipe_name); if ((request = request_create (thread, handle, &pipe_name)) != NULL) { request-> block_size = block_size; request-> block_data = block_data; request-> sending = TRUE; } } /* ------------------------------------------------------------------------- * request_create * * Creates a new request, and initialises it to empty. */ static TRANREQ * request_create (THREAD *thread, sock_t handle, char **pipe_name) { static qbyte reqid = 0; /* Cyclic request id */ TRANREQ *request; /* Request we create */ SYMBOL *pipe_sym; request = node_create (requests.prev, sizeof (TRANREQ)); if (request) { /* Initialise the request with default values */ request-> reply_to = thread-> event-> sender; request-> handle = handle; request-> block_size = 0; request-> block_data = NULL; request-> in_header = TRUE; request-> in_progress = FALSE; request-> reqid = reqid++; request-> huge_block = FALSE; request-> pipe = NULL; /* Get pipe to use, if any */ if (pipes && **pipe_name) { pipe_sym = sym_lookup_symbol (pipes, *pipe_name); if (pipe_sym) request-> pipe = (PIPE *) pipe_sym-> value; } } mem_free (*pipe_name); return (request); } /************************ CREATE GET BLOCK REQUEST ***********************/ MODULE create_get_block_request (THREAD *thread) { sock_t handle; /* Socket for output */ TRANREQ *request; /* Newly-created request */ char *pipe_name = NULL; /* Pipe for i/o */ tcb = thread-> tcb; /* Point to thread's context */ /* Get arguments from message */ exdr_read (thread-> event-> body, SMT_TRAN_GETB, &handle, &pipe_name); if ((request = request_create (thread, handle, &pipe_name)) != NULL) request-> sending = FALSE; } /************************ CREATE PUT HUGE REQUEST ************************/ MODULE create_put_huge_request (THREAD *thread) { qbyte block_size; /* Size of data block to write */ char *block_data = NULL, /* Block of data to write */ *pipe_name = NULL; /* Pipe for i/o */ sock_t handle; /* Socket for output */ TRANREQ *request; /* Newly-created request */ tcb = thread-> tcb; /* Point to thread's context */ /* Get arguments from message */ exdr_read (thread-> event-> body, SMT_TRAN_PUTH, &handle, &block_size, &block_data, &pipe_name); if ((request = request_create (thread, handle, &pipe_name)) != NULL) { request-> block_size = block_size; request-> block_data = block_data; request-> sending = TRUE; request-> huge_block = TRUE; } } /************************ CREATE GET HUGE REQUEST ************************/ MODULE create_get_huge_request (THREAD *thread) { sock_t handle; /* Socket for output */ TRANREQ *request; /* Newly-created request */ char *pipe_name = NULL; /* Pipe for i/o */ tcb = thread-> tcb; /* Point to thread's context */ /* Get arguments from message */ exdr_read (thread-> event-> body, SMT_TRAN_GETH, &handle, &pipe_name); if ((request = request_create (thread, handle, &pipe_name)) != NULL) { request-> sending = FALSE; request-> huge_block = TRUE; } } /*********************** PROCESS TRANSFER REQUESTS ***********************/ MODULE process_transfer_requests (THREAD *thread) { TRANREQ *request; /* Request we process */ Bool did_io = FALSE; /* True if we did a put or get */ tcb = thread-> tcb; /* Point to thread's context */ for (request = requests.next; request != (TRANREQ *) &requests; request = request-> next) { if (request-> in_progress) continue; /* Ignore requests in progress */ if (request-> sending) { if (request-> pipe == NULL || request-> pipe-> output_credit > 0) { /* Send header and body one after the other */ put_request_header (thread, request); put_request_body (thread, request); request-> in_progress = TRUE; did_io = TRUE; } } else /* Otherwise we want GET */ if (request-> pipe == NULL || request-> pipe-> input_credit > 0) { if (request-> in_header) get_request_header (thread, request); else get_request_body (thread, request); request-> in_progress = TRUE; did_io = TRUE; } } /* All i/o is waiting for a pipe to be freed - loop slowly */ if (!did_io) send_alarm (&timeq, 0, 50, 6, (byte *) "EMPTY"); } static void put_request_header (THREAD *thread, TRANREQ *request) { dbyte dsize; /* Block size in network order */ qbyte qsize; /* Huge size in network order */ /* Write 2/4-byte block header in network format */ if (request-> huge_block) { qsize = htonl (request-> block_size); send_writeh (&sockq, 0, request-> handle, 4, (byte *) &qsize, request-> reqid); if (request-> pipe) request-> pipe-> output_credit -= 4; } else { dsize = htons ((dbyte) request-> block_size); send_write (&sockq, 0, request-> handle, 2, (byte *) &dsize, request-> reqid); if (request-> pipe) request-> pipe-> output_credit -= 2; } } static void put_request_body (THREAD *thread, TRANREQ *request) { if (request-> huge_block) send_writeh ( &sockq, /* Socket agent queue */ 0, /* Timeout for transmission */ request-> handle, /* Socket to send to */ request-> block_size, /* Amount of data to send */ (byte *) request-> block_data, /* Address of data to send */ request-> reqid); /* Request tag */ else send_write ( &sockq, 0, request-> handle, (dbyte) request-> block_size, (byte *) request-> block_data, request-> reqid); if (request-> pipe) request-> pipe-> output_credit -= request-> block_size; } static void get_request_header (THREAD *thread, TRANREQ *request) { if (request-> huge_block) send_readh ( &sockq, /* Socket agent queue */ 0, /* Timeout for transmission */ request-> handle, /* Socket to read from */ 4, 4, /* Read exactly 4 bytes */ request-> reqid); /* Request tag */ else send_read ( &sockq, /* Socket agent queue */ 0, /* Timeout for transmission */ request-> handle, /* Socket to read from */ 2, 2, /* Read exactly 2 bytes */ request-> reqid); /* Request tag */ } static void get_request_body (THREAD *thread, TRANREQ *request) { if (request-> huge_block) send_readh ( &sockq, /* Socket agent queue */ 0, /* Timeout for transmission */ request-> handle, /* Socket to read from */ request-> block_size, /* Max amount of data to read */ request-> block_size, /* Min amount of data to read */ request-> reqid); /* Request tag */ else send_read ( &sockq, 0, request-> handle, (dbyte) request-> block_size, (dbyte) request-> block_size, request-> reqid); } /*********************** UPDATE REQUEST AFTER READ ***********************/ MODULE update_request_after_read (THREAD *thread) { dbyte netsize, /* Block size in network order */ *netsize_addr = &netsize; qbyte reqid; /* Request ID back from SMTSOCK */ TRANREQ *request; /* Request we process */ byte *body = NULL; /* Received message body */ tcb = thread-> tcb; /* Point to thread's context */ /* Pick up reqid from event body, encoded as SMT_SOCK_READ_OK */ exdr_read (thread-> event-> body, SMT_SOCK_READ_OK, NULL, NULL, NULL, NULL, &reqid); if ((request = request_lookup (reqid)) == NULL) return; /* Something garbage - ignore it */ /* We either just received the header or the body */ if (request-> in_header) { /* Pick up block size from event body encoded as SMT_SOCK_READ_OK */ exdr_read (thread-> event-> body, SMT_SOCK_READ_OK, NULL, NULL, NULL, &netsize_addr, NULL); request-> block_size = ntohs (netsize); request-> in_header = FALSE; request-> in_progress = FALSE; } else { /* Get data from socket agent reply message */ exdr_read (thread-> event-> body, SMT_SOCK_READ_OK, NULL, NULL, &request-> block_size, &body, NULL); /* Send it back to the calling program */ send_tran_getb_ok (&request-> reply_to, (dbyte) request-> block_size, body); request_destroy (request); mem_free (body); } } /* Returns in-progress request for handle, or NULL if none found */ static TRANREQ *request_lookup (qbyte reqid) { TRANREQ *request; /* Request we process */ for (request = requests.next; request != (TRANREQ *) &requests; request = request-> next) { if (request-> reqid == reqid) return (request); /* We found the request */ } return (NULL); } static void request_destroy (TRANREQ *request) { mem_free (request-> block_data); node_destroy (request); } /*********************** UPDATE REQUEST AFTER READH **********************/ MODULE update_request_after_readh (THREAD *thread) { qbyte netsize, /* Block size in network order */ *netsize_addr = &netsize, reqid; /* Request ID back from SMTSOCK */ TRANREQ *request; /* Request we process */ byte *body = NULL; /* Received message body */ tcb = thread-> tcb; /* Point to thread's context */ /* Pick up reqid from event body, encoded as SMT_SOCK_READH_OK */ exdr_read (thread-> event-> body, SMT_SOCK_READH_OK, NULL, NULL, NULL, NULL, &reqid); if ((request = request_lookup (reqid)) == NULL) return; /* Something garbage - ignore it */ /* We either just received the header or the body */ if (request-> in_header) { /* Pick up block size from event body encoded as SMT_SOCK_READH_OK */ exdr_read (thread-> event-> body, SMT_SOCK_READH_OK, NULL, NULL, NULL, &netsize_addr, NULL); request-> block_size = ntohl (netsize); request-> in_header = FALSE; request-> in_progress = FALSE; } else { /* Get data from socket agent reply message */ exdr_read (thread-> event-> body, SMT_SOCK_READH_OK, NULL, NULL, &request-> block_size, &body, NULL); /* Send it back to the calling program */ send_tran_geth_ok (&request-> reply_to, request-> block_size, body); request_destroy (request); mem_free (body); } } /*********************** UPDATE REQUEST AFTER WRITE **********************/ MODULE update_request_after_write (THREAD *thread) { qbyte reqid; /* Request ID back from SMTSOCK */ TRANREQ *request; /* Request we process */ tcb = thread-> tcb; /* Point to thread's context */ /* Pick up reqid from event body, encoded as SMT_SOCK_OK */ exdr_read (thread-> event-> body, SMT_SOCK_OK, NULL, &reqid); if ((request = request_lookup (reqid)) == NULL) return; /* Something garbage - ignore it */ /* We either just sent the header or the body */ if (request-> in_header) request-> in_header = FALSE; else { /* Send confirmation back to the calling program */ send_tran_putb_ok (&request-> reply_to, (dbyte) request-> block_size); request_destroy (request); } } /********************** UPDATE REQUEST AFTER WRITEH **********************/ MODULE update_request_after_writeh (THREAD *thread) { qbyte reqid; /* Request ID back from SMTSOCK */ TRANREQ *request; /* Request we process */ tcb = thread-> tcb; /* Point to thread's context */ /* Pick up reqid from event body, encoded as SMT_SOCK_OK */ exdr_read (thread-> event-> body, SMT_SOCK_OK, NULL, &reqid); if ((request = request_lookup (reqid)) == NULL) return; /* Something garbage - ignore it */ /* We either just sent the header or the body */ if (request-> in_header) request-> in_header = FALSE; else { /* Send confirmation back to the calling program */ send_tran_puth_ok (&request-> reply_to, request-> block_size); request_destroy (request); } } /********************** UPDATE REQUEST AFTER CLOSED **********************/ MODULE update_request_after_closed (THREAD *thread) { sock_t handle; /* Socket closed */ qbyte reqid; /* Request ID back from SMTSOCK */ TRANREQ *request; /* Request we process */ tcb = thread-> tcb; /* Point to thread's context */ /* Pick up reqid from event body encoded as SMT_SOCK_READ_OK */ exdr_read (thread-> event-> body, SMT_SOCK_READ_OK, NULL, &handle, NULL, NULL, &reqid); if ((request = request_lookup (reqid)) == NULL) return; /* Something garbage - ignore it */ send_tran_closed (&request-> reply_to); for (request = requests.next; request != (TRANREQ *) &requests; request = request-> next) { /* Destroy all requests that refer to this (closed) socket */ if (request-> handle == handle) { request = request-> prev; request_destroy (request-> next); } } } /*********************** UPDATE REQUEST AFTER ERROR **********************/ MODULE update_request_after_error (THREAD *thread) { qbyte reqid; /* Request ID back from SMTSOCK */ TRANREQ *request; /* Request we process */ tcb = thread-> tcb; /* Point to thread's context */ /* Pick up reqid from event body, encoded as SMT_SOCK_ERROR */ exdr_read (thread-> event-> body, SMT_SOCK_ERROR, NULL, NULL, &reqid); if ((request = request_lookup (reqid)) == NULL) return; /* Something garbage - ignore it */ send_tran_error (&request-> reply_to, (char *) thread-> event-> body); request_destroy (request); } /************************* CREATE PUT FILE THREAD ************************/ MODULE create_put_file_thread (THREAD *thread) { struct_smt_tran_putf *params; THREAD *child; /* Handle to child thread */ SYMBOL *pipe_sym; tcb = thread-> tcb; /* Point to thread's context */ get_smt_tran_putf (thread-> event-> body, ¶ms); child = thread_create (AGENT_NAME, ""); ASSERT (child); tcb = child-> tcb; tcb-> thread_type = put_file_event; tcb-> filename = mem_strdup (clean_path (params-> filename)); tcb-> start_slice = params-> start; tcb-> end_slice = params-> end; tcb-> filetype = params-> filetype; tcb-> handle = params-> socket; tcb-> append = FALSE; tcb-> maxsize = 0xFFFFFFFFUL; tcb-> reply_to = thread-> event-> sender; tcb-> pipe = NULL; /* Get pipe to use, if any */ if (pipes && *params-> pipe) { pipe_sym = sym_lookup_symbol (pipes, params-> pipe); if (pipe_sym) tcb-> pipe = (PIPE *) pipe_sym-> value; } #if (defined (GATES_FILESYSTEM)) /* On DOS-based file systems, text files already contain CRLFs, and * can thus be handled as binary files. */ tcb-> filetype = FILETYPE_BINARY; #endif event_send ( &child-> queue-> qid, /* Send to child thread queue */ &thread-> event-> sender, /* Queue for reply */ "PUT_FILE", /* Name of event to send */ NULL, 0, /* Event body and size */ NULL, NULL, NULL, /* No response events */ 0); /* No timeout */ fileio_threads++; free_smt_tran_putf (¶ms); } /************************* CREATE GET FILE THREAD ************************/ MODULE create_get_file_thread (THREAD *thread) { struct_smt_tran_getf *params; THREAD *child; /* Handle to child thread */ SYMBOL *pipe_sym; tcb = thread-> tcb; /* Point to thread's context */ get_smt_tran_getf (thread-> event-> body, ¶ms); child = thread_create (AGENT_NAME, ""); ASSERT (child); tcb = child-> tcb; tcb-> thread_type = get_file_event; tcb-> filename = mem_strdup (clean_path (params-> filename)); tcb-> start_slice = params-> start; tcb-> end_slice = params-> end; tcb-> filetype = params-> filetype; tcb-> handle = params-> socket; tcb-> append = params-> append; tcb-> maxsize = params-> maxsize; tcb-> reply_to = thread-> event-> sender; tcb-> pipe = NULL; /* Get pipe to use, if any */ if (pipes && *params-> pipe) { pipe_sym = sym_lookup_symbol (pipes, params-> pipe); if (pipe_sym) tcb-> pipe = (PIPE *) pipe_sym-> value; } event_send ( &child-> queue-> qid, /* Send to child thread queue */ &thread-> event-> sender, /* Queue for reply */ "GET_FILE", /* Name of event to send */ NULL, 0, /* Event body and size */ NULL, NULL, NULL, /* No response events */ 0); /* No timeout */ fileio_threads++; free_smt_tran_getf (¶ms); } /************************** OPEN FILE FOR OUTPUT *************************/ MODULE open_file_for_output (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ if (tcb-> append) open_transfer_file (thread, 'a'); else open_transfer_file (thread, 'o'); } static void open_transfer_file (THREAD *thread, char mode) { int open_mode; tcb = thread-> tcb; /* Point to thread's context */ if (mode == 'i') /* Input */ open_mode = O_RDONLY; else if (mode == 'o') /* Output */ open_mode = O_WRONLY | O_CREAT | O_TRUNC; else if (mode == 'a') /* Append */ open_mode = O_WRONLY | O_CREAT | O_APPEND; else return; /* Error */ tcb-> filesize = 0; tcb-> stream = lazy_open (tcb-> filename, open_mode); if (io_completed) { if (tcb-> stream < 0) /* If the open failed, send error */ { /* to console, and terminate */ sendfmt (&operq, "ERROR", "smttran: cannot open '%s' for '%c'", tcb-> filename, mode); signal_fatal_error (thread); } else if (tcb-> start_slice > 0) /* Position at start of slice? */ lseek (tcb-> stream, tcb-> start_slice, SEEK_SET); } } local signal_fatal_error (THREAD *thread) { senderr (&operq); raise_exception (exception_event); send_tran_error (&tcb-> reply_to, strerror (errno)); } /************************** OPEN FILE FOR INPUT **************************/ MODULE open_file_for_input (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ open_transfer_file (thread, 'i'); } /************************* READ FILE DATA BUFFER *************************/ MODULE read_file_data_buffer (THREAD *thread) { long read_size; /* Amount of data read/to read */ tcb = thread-> tcb; /* Point to thread's context */ if (tcb-> end_slice > 0) /* End of slice defined? */ { read_size = tcb-> end_slice - tcb-> filesize + 1; if (read_size < 0) read_size = 0; else if (read_size > BUFFER_SIZE) read_size = BUFFER_SIZE; } else read_size = BUFFER_SIZE; /* If read_size is 0, we're finished reading */ if (read_size > 0) { mem_strfree (&tcb-> block_data); tcb-> block_size = read_size; tcb-> block_data = mem_alloc ((dbyte) read_size); read_size = lazy_read (tcb-> stream, tcb-> block_data, (int) read_size); } if (io_completed) { if (read_size == 0) raise_exception (end_of_file_event); else if (read_size == -1) /* If the read failed, send error */ { /* to console, and terminate */ sendfmt (&operq, "ERROR", "smttran: could not read from %s", tcb-> filename); signal_fatal_error (thread); } else tcb-> read_size = read_size; } } /********************** CONVERT ASCII TO NET IF REQD *********************/ MODULE convert_ascii_to_net_if_reqd (THREAD *thread) { qbyte text_size; /* Size of text block */ tcb = thread-> tcb; /* Point to thread's context */ if (tcb-> filetype == FILETYPE_BINARY) { tcb-> write_size = tcb-> read_size; tcb-> send_text = FALSE; } else { /* Allocate text block, in the worst case = all LFs -> CRLFs */ mem_strfree (&tcb-> block_text); text_size = tcb-> read_size * 2; tcb-> block_text = mem_alloc ((size_t) text_size); if (tcb-> block_text) { /* Convert data to 'text' */ tcb-> write_size = file_set_eoln (tcb-> block_text, tcb-> block_data, (dbyte) tcb-> read_size, TRUE); tcb-> send_text = TRUE; } else { sendfmt (&operq, "ERROR", "smttran: could not allocate text buffer (%ld bytes)", text_size); signal_fatal_error (thread); } } } /************************** PUT BUFFER TO SOCKET *************************/ MODULE put_buffer_to_socket (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ if (tcb-> pipe == NULL || tcb-> pipe-> output_credit > 0) { tcb-> filesize += tcb-> write_size; send_write (&sockq, 0, tcb-> handle, (word) tcb-> write_size, (byte *) (tcb-> send_text? tcb-> block_text: tcb-> block_data), 0); if (tcb-> pipe) tcb-> pipe-> output_credit -= tcb-> write_size; } else send_alarm (&timeq, 0, 50, 0, NULL); } /************************* GET BUFFER FROM SOCKET ************************/ MODULE get_buffer_from_socket (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ if (tcb-> pipe == NULL || tcb-> pipe-> input_credit > 0) /* Read buffer from socket, we expect up to BUFFER_MAX bytes, */ /* but anything will do. */ send_read (&sockq, 0, tcb-> handle, BUFFER_SIZE, 1, 0); else send_alarm (&timeq, 0, 50, 0, NULL); } /********************** CONVERT NET TO ASCII IF REQD *********************/ MODULE convert_net_to_ascii_if_reqd (THREAD *thread) { dbyte read_size; tcb = thread-> tcb; /* Point to thread's context */ if (tcb-> filetype == FILETYPE_BINARY) { mem_strfree (&tcb-> block_data); exdr_read (thread-> event-> body, SMT_SOCK_READ_OK, NULL, NULL, &read_size, &tcb-> block_data, NULL); tcb-> write_size = read_size; } else { mem_strfree (&tcb-> block_text); exdr_read (thread-> event-> body, SMT_SOCK_READ_OK, NULL, NULL, &read_size, &tcb-> block_text, NULL); mem_strfree (&tcb-> block_data); tcb-> block_data = mem_alloc (read_size); if (tcb-> block_text) tcb-> write_size = file_set_eoln (tcb-> block_data, tcb-> block_text, read_size, FALSE); else { sendfmt (&operq, "ERROR", "smttran: could not allocate text buffer (%ld bytes)", read_size); signal_fatal_error (thread); } } if (tcb-> pipe) tcb-> pipe-> input_credit -= read_size; } /************************* WRITE FILE DATA BUFFER ************************/ MODULE write_file_data_buffer (THREAD *thread) { int rc = 0, /* Return code from write */ write_size; /* Actual amount of data to write */ tcb = thread-> tcb; /* Point to thread's context */ write_size = (int) tcb-> write_size; if (write_size + tcb-> filesize > tcb-> maxsize) write_size = (int) (tcb-> maxsize - tcb-> filesize); tcb-> filesize += tcb-> write_size; if (write_size > 0) /* Don't exceed limits */ rc = lazy_write (tcb-> stream, tcb-> block_data, (size_t) write_size); if (io_completed) { if (rc < 0) /* If the read failed, send error */ { /* to console, and terminate */ sendfmt (&operq, "ERROR", "smttran: could not write to %s", tcb-> filename); signal_fatal_error (thread); } } } /************************** SIGNAL PUT FILE OKAY *************************/ MODULE signal_put_file_okay (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ send_tran_putf_ok (&tcb-> reply_to, tcb-> filesize); fileio_threads--; /* Request is finished */ } /************************** SIGNAL GET FILE OKAY *************************/ MODULE signal_get_file_okay (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ send_tran_getf_ok (&tcb-> reply_to, tcb-> filesize); fileio_threads--; /* Request is finished */ } /************************** SIGNAL SOCKET CLOSED *************************/ MODULE signal_socket_closed (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ send_tran_closed (&tcb-> reply_to); fileio_threads--; /* Request is finished */ } /************************** SIGNAL SOCKET ERROR **************************/ MODULE signal_socket_error (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ send_tran_error (&tcb-> reply_to, (char *) thread-> event-> body); fileio_threads--; /* Request is finished */ } /************************ OKAY IF NO REQUESTS LEFT ***********************/ MODULE okay_if_no_requests_left (THREAD *thread) { static QID commit_requestor = { 0, 0 }; tcb = thread-> tcb; /* Point to thread's context */ /* Since commit events come from various places, we will store the QID */ /* of the first one to come in, and use that for reply events. When */ /* we send an ok event, we'll reset the requestor queue id. This is */ /* not the best solution, but will have to do. */ if (commit_requestor.ident == 0) commit_requestor = thread-> event-> sender; /* If no requests left, signal all closed to sender, else send a */ /* timer alarm event to ourselves to cause the dialog to loop. */ if (requests.next == &requests && fileio_threads == 0) { send_tran_closed (&commit_requestor); commit_requestor.ident = 0; } else /* Ask timer to send us an event after one second */ send_alarm (&timeq, 0, 100, 7, (byte *) "COMMIT"); } /************************* REGENERATE ALARM EVENT ************************/ MODULE regenerate_alarm_event (THREAD *thread) { struct_smt_time_reply *reply; /* Reply from timer */ tcb = thread-> tcb; /* Point to thread's context */ get_smt_time_reply (thread-> event-> body, &reply); if (streq ((char *) reply-> tag_data, "COMMIT")) the_next_event = commit_event; else the_next_event = empty_event; free_smt_time_reply (&reply); } /*************************** CHECK THREAD TYPE ***************************/ MODULE check_thread_type (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ the_next_event = tcb-> thread_type; } /************************** DESTROY ALL REQUESTS *************************/ MODULE destroy_all_requests (THREAD *thread) { while (requests.next != &requests) request_destroy (requests.next); } /************************* TERMINATE THE THREAD **************************/ MODULE terminate_the_thread (THREAD *thread) { tcb = thread-> tcb; /* Point to thread's context */ if (tcb-> stream >= 0) { lazy_close (tcb-> stream); if (io_completed) tcb-> stream = -1; } if (tcb->stream < 0) { mem_strfree (&tcb-> filename); mem_strfree (&tcb-> block_data); mem_strfree (&tcb-> block_text); the_next_event = terminate_event; } }