Programming Notes


Porting

The State Threads library uses OS concepts that are available in some form on most UNIX platforms, making the library very portable across many flavors of UNIX. However, there are several parts of the library that rely on platform-specific features. Here is the list of such parts:

All machine-dependent feature test macros should be defined in the md.h header file. The assembly code for setjmp/longjmp replacement functions for all CPU architectures should be placed in the md.S file.

The current version of the library is ported to:

Signals

Signal handling in an application using State Threads should be treated the same way as in a classical UNIX process application. There is no such thing as per-thread signal mask, all threads share the same signal handlers, and only asynchronous-safe functions can be used in signal handlers. However, there is a way to process signals synchronously by converting a signal event to an I/O event: a signal catching function does a write to a pipe which will be processed synchronously by a dedicated signal handling thread. The following code demonstrates this technique (error handling is omitted for clarity):

/* Per-process pipe which is used as a signal queue. */
/* Up to PIPE_BUF/sizeof(int) signals can be queued up. */
int sig_pipe[2];

/* Signal catching function. */
/* Converts signal event to I/O event. */
void sig_catcher(int signo)
{
  int err;

  /* Save errno to restore it after the write() */
  err = errno;
  /* write() is reentrant/async-safe */
  write(sig_pipe[1], &signo, sizeof(int));
  errno = err;
}

/* Signal processing function. */
/* This is the "main" function of the signal processing thread. */
void *sig_process(void *arg)
{
  st_netfd_t nfd;
  int signo;

  nfd = st_netfd_open(sig_pipe[0]);

  for ( ; ; ) {
    /* Read the next signal from the pipe */
    st_read(nfd, &signo, sizeof(int), -1);

    /* Process signal synchronously */
    switch (signo) {
    case SIGHUP:
      /* do something here - reread config files, etc. */
      break;
    case SIGTERM:
      /* do something here - cleanup, etc. */
      break;
      /*      .
              .
         Other signals
              .
              .
      */
    }
  }

  return NULL;
}

int main(int argc, char *argv[])
{
  struct sigaction sa;
        .
        .
        .

  /* Create signal pipe */
  pipe(sig_pipe);

  /* Create signal processing thread */
  st_thread_create(sig_process, NULL, 0, 0);

  /* Install sig_catcher() as a signal handler */
  sa.sa_handler = sig_catcher;
  sigemptyset(&sa.sa_mask);
  sa.sa_flags = 0;
  sigaction(SIGHUP, &sa, NULL);

  sa.sa_handler = sig_catcher;
  sigemptyset(&sa.sa_mask);
  sa.sa_flags = 0;
  sigaction(SIGTERM, &sa, NULL);

        .
        .
        .
      
}

Note that if multiple processes are used (see below), the signal pipe should be initialized after the fork(2) call so that each process has its own private pipe.

Intra-Process Synchronization

Due to the event-driven nature of the library scheduler, the thread context switch (process state change) can only happen in a well-known set of library functions. This set includes functions in which a thread may "block": I/O functions (st_read(), st_write(), etc.), sleep functions (st_sleep(), etc.), and thread synchronization functions (st_thread_join(), st_cond_wait(), etc.). As a result, process-specific global data need not to be protected by locks since a thread cannot be rescheduled while in a critical section (and only one thread at a time can access the same memory location). By the same token, non thread-safe functions (in a traditional sense) can be safely used with the State Threads. The library's mutex facilities are practically useless for a correctly written application (no blocking functions in critical section) and are provided mostly for completeness. This absence of locking greatly simplifies an application design and provides a foundation for scalability.

Inter-Process Synchronization

The State Threads library makes it possible to multiplex a large number of simultaneous connections onto a much smaller number of separate processes, where each process uses a many-to-one user-level threading implementation (N of M:1 mappings rather than one M:N mapping used in native threading libraries on some platforms). This design is key to the application's scalability. One can think about it as if a set of all threads is partitioned into separate groups (processes) where each group has a separate pool of resources (virtual address space, file descriptors, etc.). An application designer has full control of how many groups (processes) an application creates and what resources, if any, are shared among different groups via standard UNIX inter-process communication (IPC) facilities.

There are several reasons for creating multiple processes:

Ideally all user sessions are completely independent, so there is no need for inter-process communication. It is always better to have several separate smaller process-specific resources (e.g., data caches) than to have one large resource shared (and modified) by all processes. Sometimes, however, there is a need to share a common resource among different processes. In that case, standard UNIX IPC facilities can be used. In addition to that, there is a way to synchronize different processes so that only the thread accessing the shared resource will be suspended (but not the entire process) if that resource is unavailable. In the following code fragment a pipe is used as a counting semaphore for inter-process synchronization:

#ifndef PIPE_BUF
#define PIPE_BUF 512  /* POSIX */
#endif

/* Semaphore data structure */
typedef struct ipc_sem {
  st_netfd_t rdfd;  /* read descriptor */
  st_netfd_t wrfd;  /* write descriptor */
} ipc_sem_t;

/* Create and initialize the semaphore. Should be called before fork(2). */
/* 'value' must be less than PIPE_BUF. */
/* If 'value' is 1, the semaphore works as mutex. */
ipc_sem_t *ipc_sem_create(int value)
{
  ipc_sem_t *sem;
  int p[2];
  char b[PIPE_BUF];

  /* Error checking is omitted for clarity */
  sem = malloc(sizeof(ipc_sem_t));

  /* Create the pipe */
  pipe(p);
  sem->rdfd = st_netfd_open(p[0]);
  sem->wrfd = st_netfd_open(p[1]);

  /* Initialize the semaphore: put 'value' bytes into the pipe */
  write(p[1], b, value);

  return sem;
}

/* Try to decrement the "value" of the semaphore. */
/* If "value" is 0, the calling thread blocks on the semaphore. */
int ipc_sem_wait(ipc_sem_t *sem)
{
  char c;

  /* Read one byte from the pipe */
  if (st_read(sem->rdfd, &c, 1, -1) != 1)
    return -1;

  return 0;
}

/* Increment the "value" of the semaphore. */
int ipc_sem_post(ipc_sem_t *sem)
{
  char c;

  if (st_write(sem->wrfd, &c, 1, -1) != 1)
    return -1;

  return 0;
}

Generally, the following steps should be followed when writing an application using the State Threads library:

  1. Initialize the library (st_init()).
  2. Create resources that will be shared among different processes: create and bind listening sockets, create shared memory segments, IPC channels, synchronization primitives, etc.
  3. Create several processes (fork(2)). The parent process should either exit or become a "watchdog" (e.g., it starts a new process when an existing one crashes, does a cleanup upon application termination, etc.).
  4. In each child process create a pool of threads (st_thread_create()) to handle user connections.

Non-Network I/O

The State Threads architecture uses non-blocking I/O on st_netfd_t objects for concurrent processing of multiple user connections. This architecture has a drawback: the entire process and all its threads may block for the duration of a disk or other non-network I/O operation, whether through State Threads I/O functions, direct system calls, or standard I/O functions. (This is applicable mostly to disk reads; disk writes are usually performed asynchronously -- data goes to the buffer cache to be written to disk later.) Fortunately, disk I/O (unlike network I/O) usually takes a finite and predictable amount of time, but this may not be true for special devices or user input devices (including stdin). Nevertheless, such I/O reduces throughput of the system and increases response times. There are several ways to design an application to overcome this drawback:

Timeouts

The timeout parameter to st_cond_timedwait() and the I/O functions, and the arguments to st_sleep() and st_usleep() specify a maximum time to wait since the last context switch not since the beginning of the function call.

The State Threads' time resolution is actually the time interval between context switches. That time interval may be large in some situations, for example, when a single thread does a lot of work continuously. Note that a steady, uninterrupted stream of network I/O qualifies for this description; a context switch occurs only when a thread blocks.

If a specified I/O timeout is less than the time interval between context switches the function may return with a timeout error before that amount of time has elapsed since the beginning of the function call. For example, if eight milliseconds have passed since the last context switch and an I/O function with a timeout of 10 milliseconds blocks, causing a switch, the call may return with a timeout error as little as two milliseconds after it was called. (On Linux, select()'s timeout is an upper bound on the amount of time elapsed before select returns.) Similarly, if 12 ms have passed already, the function may return immediately.

In almost all cases I/O timeouts should be used only for detecting a broken network connection or for preventing a peer from holding an idle connection for too long. Therefore for most applications realistic I/O timeouts should be on the order of seconds. Furthermore, there's probably no point in retrying operations that time out. Rather than retrying simply use a larger timeout in the first place.

The largest valid timeout value is platform-dependent and may be significantly less than INT_MAX seconds for select() or INT_MAX milliseconds for poll(). Generally, you should not use timeouts exceeding several hours. Use -1 as a special value to indicate infinite timeout or indefinite sleep.