/* ====================================================================
* The Kannel Software License, Version 1.0
*
* Copyright (c) 2001-2005 Kannel Group
* Copyright (c) 1998-2001 WapIT Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Kannel Group (http://www.kannel.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Kannel" and "Kannel Group" must not be used to
* endorse or promote products derived from this software without
* prior written permission. For written permission, please
* contact org@kannel.org.
*
* 5. Products derived from this software may not be called "Kannel",
* nor may "Kannel" appear in their name, without prior written
* permission of the Kannel Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE KANNEL GROUP OR ITS CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
* OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
* OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
* EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Kannel Group. For more information on
* the Kannel Group, please see .
*
* Portions of this software are based upon software originally written at
* WapIT Ltd., Helsinki, Finland for the Kannel project.
*/
/*
* fdset.c - module for managing a large collection of file descriptors
*/
#include "gw-config.h"
#include
#include
#include
#include "gwlib/gwlib.h"
struct FDSet
{
/* Thread ID of the set's internal thread, which will spend most
* of its time blocking on poll(). This is set when the thread
* is created, and not changed after that. It's not protected
* by any lock. */
long poll_thread;
/* The following fields are for use by the polling thread only.
* No-one else may touch them. It's not protected by any lock. */
/* Array for use with poll(). Elements 0 through size-1 are allocated.
* Elements 0 through entries-1 are in use. */
struct pollfd *pollinfo;
int size;
int entries;
/* Array of times when appropriate fd got any event or events bitmask changed */
time_t *times;
/* timeout for this fdset */
long timeout;
/* Arrays of callback and data fields. They are kept in sync with
* the pollinfo array, and are basically extra fields that we couldn't
* put in struct pollfd because that structure is defined externally. */
fdset_callback_t **callbacks;
void **datafields;
/* The poller function loops over the table after poll() returns,
* and calls callback functions that may modify the table that is
* being scanned. We can't just copy the table to avoid interference,
* because fdset_unregister and fdset_listen guarantee that their
* operations are complete when they return -- that does not work
* if poller() is scanning an outdated copy of the table.
* To solve this, we have a field that marks when the table is
* being scanned. If this field is true, fdset_unregister merely
* sets the fd to -1 instead of deleting the whole entry.
* fdset_listen will takes care to modify revents as well as
* events. fdset_register always adds to the end of the table,
* so it does not have to do anything special.
*/
int scanning;
/* This field keeps track of how many fds were set to -1 by
* fdset_unregister while "scanning" is true. That way we can
* efficiently check if we need to scan the table to really
* delete those entries. */
int deleted_entries;
/* The following fields are for general use, and are of types that
* have internal locks. */
/* List of struct action. Used by other threads to make requests
* of the polling thread. */
List *actions;
};
/* Datatype to describe changes to the fdset fields that only the polling
* thread may touch. Other threads use this type to submit requests to
* change those fields. */
/* Action life cycle: Created, then pushed on set->actions list by
* action_submit. Poller thread wakes up and takes it from the list,
* then calls handle_action, which performs the action and pushes it
* on the action's done list. action_submit then takes it back and
* destroys it. */
/* If no synchronization is needed, action_submit_nosync can be used.
* In that case handle_action will destroy the action itself instead
* of putting it on any list. */
struct action
{
enum { REGISTER, LISTEN, UNREGISTER, DESTROY } type;
int fd; /* Used by REGISTER, LISTEN, and UNREGISTER */
int mask; /* Used by LISTEN */
int events; /* Used by REGISTER and LISTEN */
fdset_callback_t *callback; /* Used by REGISTER */
void *data; /* Used by REGISTER */
/* When the request has been handled, an element is produced on this
* list, so that the submitter can synchronize. Can be left NULL. */
List *done; /* Used by LISTEN, UNREGISTER, and DESTROY */
};
/* Return a new action structure of the given type, with all fields empty. */
static struct action *action_create(int type)
{
struct action *new;
new = gw_malloc(sizeof(*new));
new->type = type;
new->fd = -1;
new->mask = 0;
new->events = 0;
new->callback = NULL;
new->data = NULL;
new->done = NULL;
return new;
}
static void action_destroy(struct action *action)
{
if (action == NULL)
return;
gwlist_destroy(action->done, NULL);
gw_free(action);
}
/* For use with gwlist_destroy */
static void action_destroy_item(void *action)
{
action_destroy(action);
}
/*
* Submit an action for this set, and wait for the polling thread to
* confirm that it's been done, by pushing the action on its done list.
*/
static void submit_action(FDSet *set, struct action *action)
{
List *done;
void *sync;
gw_assert(set != NULL);
gw_assert(action != NULL);
done = gwlist_create();
gwlist_add_producer(done);
action->done = done;
gwlist_append(set->actions, action);
gwthread_wakeup(set->poll_thread);
sync = gwlist_consume(done);
gw_assert(sync == action);
action_destroy(action);
}
/*
* As above, but don't wait for confirmation.
*/
static void submit_action_nosync(FDSet *set, struct action *action)
{
gwlist_append(set->actions, action);
gwthread_wakeup(set->poll_thread);
}
/* Do one action for this thread and confirm that it's been done by
* appending the action to its done list. May only be called by
* the polling thread. Returns 0 normally, and returns -1 if the
* action destroyed the set. */
static int handle_action(FDSet *set, struct action *action)
{
int result;
gw_assert(set != NULL);
gw_assert(set->poll_thread == gwthread_self());
gw_assert(action != NULL);
result = 0;
switch (action->type) {
case REGISTER:
fdset_register(set, action->fd, action->events,
action->callback, action->data);
break;
case LISTEN:
fdset_listen(set, action->fd, action->mask, action->events);
break;
case UNREGISTER:
fdset_unregister(set, action->fd);
break;
case DESTROY:
fdset_destroy(set);
result = -1;
break;
default:
panic(0, "fdset: handle_action got unknown action type %d.",
action->type);
}
if (action->done == NULL)
action_destroy(action);
else
gwlist_produce(action->done, action);
return result;
}
/* Look up the entry number in the pollinfo array for this fd.
* Right now it's a linear search, this may have to be improved. */
static int find_entry(FDSet *set, int fd)
{
int i;
gw_assert(set != NULL);
gw_assert(gwthread_self() == set->poll_thread);
for (i = 0; i < set->entries; i++) {
if (set->pollinfo[i].fd == fd)
return i;
}
return -1;
}
static void remove_entry(FDSet *set, int entry)
{
if (entry != set->entries - 1) {
/* We need to keep the array contiguous, so move the last element
* to fill in the hole. */
set->pollinfo[entry] = set->pollinfo[set->entries - 1];
set->callbacks[entry] = set->callbacks[set->entries - 1];
set->datafields[entry] = set->datafields[set->entries - 1];
set->times[entry] = set->times[set->entries - 1];
}
set->entries--;
}
static void remove_deleted_entries(FDSet *set)
{
int i;
i = 0;
while (i < set->entries && set->deleted_entries > 0) {
if (set->pollinfo[i].fd < 0) {
remove_entry(set, i);
set->deleted_entries--;
} else {
i++;
}
}
}
/* Main function for polling thread. Most its time is spent blocking
* in poll(). No-one else is allowed to change the fields it uses,
* so other threads just put something on the actions list and wake
* up this thread. That's why it checks the actions list every time
* it goes through the loop.
*/
static void poller(void *arg)
{
FDSet *set = arg;
struct action *action;
int ret;
int i;
time_t now;
gw_assert(set != NULL);
for (;;) {
while ((action = gwlist_extract_first(set->actions)) != NULL) {
/* handle_action returns -1 if the set was destroyed. */
if (handle_action(set, action) < 0)
return;
}
/* Block for defined timeout, waiting for activity */
ret = gwthread_poll(set->pollinfo, set->entries, set->timeout);
if (ret < 0) {
if (errno != EINTR) {
error(errno, "Poller: can't handle error; sleeping 1 second.");
gwthread_sleep(1.0);
}
continue;
}
time(&now);
/* Callbacks may modify the table while we scan it, so be careful. */
set->scanning = 1;
for (i = 0; i < set->entries; i++) {
if (set->pollinfo[i].revents != 0) {
set->callbacks[i](set->pollinfo[i].fd,
set->pollinfo[i].revents,
set->datafields[i]);
/* update event time */
time(&set->times[i]);
} else if (set->timeout > 0 && difftime(set->times[i] + set->timeout, now) <= 0) {
debug("gwlib.fdset", 0, "Timeout for fd:%d appeares.", set->pollinfo[i].fd);
set->callbacks[i](set->pollinfo[i].fd, POLLERR, set->datafields[i]);
}
}
set->scanning = 0;
if (set->deleted_entries > 0)
remove_deleted_entries(set);
}
}
FDSet *fdset_create_real(long timeout)
{
FDSet *new;
new = gw_malloc(sizeof(*new));
/* Start off with space for one element because we can't malloc 0 bytes
* and we don't want to worry about these pointers being NULL. */
new->size = 1;
new->entries = 0;
new->pollinfo = gw_malloc(sizeof(new->pollinfo[0]) * new->size);
new->callbacks = gw_malloc(sizeof(new->callbacks[0]) * new->size);
new->datafields = gw_malloc(sizeof(new->datafields[0]) * new->size);
new->times = gw_malloc(sizeof(new->times[0]) * new->size);
new->timeout = timeout > 0 ? timeout : -1;
new->scanning = 0;
new->deleted_entries = 0;
new->actions = gwlist_create();
new->poll_thread = gwthread_create(poller, new);
if (new->poll_thread < 0) {
error(0, "Could not start internal thread for fdset.");
fdset_destroy(new);
return NULL;
}
return new;
}
void fdset_destroy(FDSet *set)
{
if (set == NULL)
return;
if (set->poll_thread < 0 || gwthread_self() == set->poll_thread) {
if (set->entries > 0) {
warning(0, "Destroying fdset with %d active entries.",
set->entries);
}
gw_free(set->pollinfo);
gw_free(set->callbacks);
gw_free(set->datafields);
gw_free(set->times);
if (gwlist_len(set->actions) > 0) {
error(0, "Destroying fdset with %ld pending actions.",
gwlist_len(set->actions));
}
gwlist_destroy(set->actions, action_destroy_item);
gw_free(set);
} else {
long thread = set->poll_thread;
submit_action(set, action_create(DESTROY));
gwthread_join(thread);
}
}
void fdset_register(FDSet *set, int fd, int events,
fdset_callback_t callback, void *data)
{
int new;
gw_assert(set != NULL);
if (gwthread_self() != set->poll_thread) {
struct action *action;
action = action_create(REGISTER);
action->fd = fd;
action->events = events;
action->callback = callback;
action->data = data;
submit_action_nosync(set, action);
return;
}
gw_assert(set->entries <= set->size);
if (set->entries >= set->size) {
int newsize = set->entries + 1;
set->pollinfo = gw_realloc(set->pollinfo,
sizeof(set->pollinfo[0]) * newsize);
set->callbacks = gw_realloc(set->callbacks,
sizeof(set->callbacks[0]) * newsize);
set->datafields = gw_realloc(set->datafields,
sizeof(set->datafields[0]) * newsize);
set->times = gw_realloc(set->times, sizeof(set->times[0]) * newsize);
set->size = newsize;
}
/* We don't check set->scanning. Adding new entries is not harmful
* because their revents fields are 0. */
new = set->entries++;
set->pollinfo[new].fd = fd;
set->pollinfo[new].events = events;
set->pollinfo[new].revents = 0;
set->callbacks[new] = callback;
set->datafields[new] = data;
time(&set->times[new]);
}
void fdset_listen(FDSet *set, int fd, int mask, int events)
{
int entry;
gw_assert(set != NULL);
if (gwthread_self() != set->poll_thread) {
struct action *action;
action = action_create(LISTEN);
action->fd = fd;
action->mask = mask;
action->events = events;
submit_action(set, action);
return;
}
entry = find_entry(set, fd);
if (entry < 0) {
warning(0, "fdset_listen called on unregistered fd %d.", fd);
return;
}
/* Copy the bits from events specified by the mask, and preserve the
* bits not specified by the mask. */
set->pollinfo[entry].events =
(set->pollinfo[entry].events & ~mask) | (events & mask);
/* If poller is currently scanning the array, then change the
* revents field so that the callback function will not be called
* for events we should no longer listen for. The idea is the
* same as for the events field, except that we only turn bits off. */
if (set->scanning) {
set->pollinfo[entry].revents =
set->pollinfo[entry].revents & (events | ~mask);
}
time(&set->times[entry]);
}
void fdset_unregister(FDSet *set, int fd)
{
int entry;
gw_assert(set != NULL);
if (gwthread_self() != set->poll_thread) {
struct action *action;
action = action_create(UNREGISTER);
action->fd = fd;
submit_action(set, action);
return;
}
/* Remove the entry from the pollinfo array */
entry = find_entry(set, fd);
if (entry < 0) {
warning(0, "fdset_listen called on unregistered fd %d.", fd);
return;
}
if (entry == set->entries - 1) {
/* It's the last entry. We can safely remove it even while
* the array is being scanned, because the scan checks set->entries. */
set->entries--;
} else if (set->scanning) {
/* We can't remove entries because the array is being
* scanned. Mark it as deleted. */
set->pollinfo[entry].fd = -1;
set->deleted_entries++;
} else {
remove_entry(set, entry);
}
}