/*
* event_msgqueue.c
*
* Libevent threaded message passing primitives
*
* Andrew Danforth <acd@weirdness.net>, October 2006
*
* Copyright (c) Andrew Danforth, 2006
*
*/
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
//#include "config.h"
#include "event_msgqueue.h"
struct circqueue {
unsigned int head;
unsigned int tail;
unsigned int count;
unsigned int max_entries;
unsigned int array_elements;
void **entries;
};
#define DEFAULT_UNBOUNDED_QUEUE_SIZE 1024
struct event_msgqueue {
int push_fd;
int pop_fd;
int unlock_between_callbacks;
struct event queue_ev;
pthread_mutex_t lock;
void (*callback)(void *, void *);
void *cbarg;
struct circqueue *queue;
};
static unsigned int nextpow2(unsigned int num) {
--num;
num |= num >> 1;
num |= num >> 2;
num |= num >> 4;
num |= num >> 8;
num |= num >> 16;
return ++num;
}
#define circqueue_get_length(q) ((q)->count)
#define circqueue_is_empty(q) (!circqueue_get_length(q))
#define circqueue_is_full(q) ((q)->count == (q)->array_elements)
static struct circqueue *circqueue_new(unsigned int size) {
struct circqueue *cq;
if (!(cq = calloc(1, sizeof(struct circqueue))))
return(NULL);
cq->max_entries = size;
if (!size || !(cq->array_elements = nextpow2(size)))
cq->array_elements = DEFAULT_UNBOUNDED_QUEUE_SIZE;
cq->entries = malloc(sizeof(void *) * cq->array_elements);
if (!cq->entries) {
free(cq);
return(NULL);
}
return(cq);
}
static void circqueue_destroy(struct circqueue *cq) {
free(cq->entries);
free(cq);
}
static int circqueue_grow(struct circqueue *cq) {
void **newents;
unsigned int newsize = cq->array_elements << 1;
unsigned int headchunklen = 0, tailchunklen = 0;
if (!(newents = malloc(sizeof(void *) * newsize)))
return(-1);
if (cq->head < cq->tail)
headchunklen = cq->tail - cq->head;
else {
headchunklen = cq->array_elements - cq->head;
tailchunklen = cq->tail;
}
memcpy(newents, &cq->entries[cq->head], sizeof(void *) * headchunklen);
if (tailchunklen)
memcpy(&newents[headchunklen], cq->entries, sizeof(void *) * tailchunklen);
cq->head = 0;
cq->tail = headchunklen + tailchunklen;
cq->array_elements = newsize;
free(cq->entries);
cq->entries = newents;
return(0);
}
static int circqueue_push_tail(struct circqueue *cq, void *elem) {
if (cq->max_entries) {
if (cq->count == cq->max_entries)
return(-1);
} else if (circqueue_is_full(cq) && circqueue_grow(cq) != 0)
return(-1);
cq->count++;
cq->entries[cq->tail++] = elem;
cq->tail &= cq->array_elements - 1;
return(0);
}
static void *circqueue_pop_head(struct circqueue *cq) {
void *data;
if (!cq->count)
return(NULL);
cq->count--;
data = cq->entries[cq->head++];
cq->head &= cq->array_elements - 1;
return(data);
}
static void msgqueue_pop(int fd, short flags, void *arg) {
struct event_msgqueue *msgq = arg;
char buf[64];
read(fd, buf, sizeof(buf));
pthread_mutex_lock(&msgq->lock);
while(!circqueue_is_empty(msgq->queue)) {
void *qdata;
qdata = circqueue_pop_head(msgq->queue);
if (msgq->unlock_between_callbacks)
pthread_mutex_unlock(&msgq->lock);
msgq->callback(qdata, msgq->cbarg);
if (msgq->unlock_between_callbacks)
pthread_mutex_lock(&msgq->lock);
}
pthread_mutex_unlock(&msgq->lock);
}
struct event_msgqueue *msgqueue_new(struct event_base *base, unsigned int max_size, void (*callback)(void *, void *), void *cbarg) {
struct event_msgqueue *msgq;
struct circqueue *cq;
int fds[2];
if (!(cq = circqueue_new(max_size)))
return(NULL);
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) != 0) {
circqueue_destroy(cq);
return(NULL);
}
if (!(msgq = malloc(sizeof(struct event_msgqueue)))) {
circqueue_destroy(cq);
close(fds[0]);
close(fds[1]);
return(NULL);
}
msgq->push_fd = fds[0];
msgq->pop_fd = fds[1];
msgq->queue = cq;
msgq->callback = callback;
msgq->cbarg = cbarg;
pthread_mutex_init(&msgq->lock, NULL);
event_set(&msgq->queue_ev, msgq->pop_fd, EV_READ | EV_PERSIST, msgqueue_pop, msgq);
event_base_set(base, &msgq->queue_ev);
event_add(&msgq->queue_ev, NULL);
msgq->unlock_between_callbacks = 1;
return(msgq);
}
void msgqueue_destroy(struct event_msgqueue *msgq)
{
for( ; msgqueue_length(msgq) > 0; ) {
sleep( 1 );
}
event_del(&msgq->queue_ev);
circqueue_destroy(msgq->queue);
close(msgq->push_fd);
close(msgq->pop_fd);
free(msgq);
}
int msgqueue_push(struct event_msgqueue *msgq, void *msg) {
const char buf[1] = { 0 };
int r = 0;
pthread_mutex_lock(&msgq->lock);
if ((r = circqueue_push_tail(msgq->queue, msg)) == 0) {
if (circqueue_get_length(msgq->queue) == 1)
write(msgq->push_fd, buf, 1);
}
pthread_mutex_unlock(&msgq->lock);
return(r);
}
unsigned int msgqueue_length(struct event_msgqueue *msgq) {
unsigned int len;
pthread_mutex_lock(&msgq->lock);
len = circqueue_get_length(msgq->queue);
pthread_mutex_unlock(&msgq->lock);
return(len);
}
syntax highlighted by Code2HTML, v. 0.9.1