#include <unistd.h>
#include <stdlib.h>
#include <dirent.h>
#include <sys/stat.h>
#include <stdio.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <pthread.h>
#include "pfq_backend.h"
#include "pfq_service.h"
#include "../pfqmessage.h"
#include "../config.h"
#include "../pfqtcp.h"
#define PF_VERSION_20 1
#define PF_VERSION_21 2
#define PF_VERSION_22 3
pthread_mutex_t socket_mutex = PTHREAD_MUTEX_INITIALIZER;
int CURQ;
int NUMMSG_THREAD;
struct sockaddr_in svra;
struct hostent *svr;
int sock;
struct pfb_conf_t pfb_conf;
void strip_nl(char* b, int l) {
int i;
for ( i=0; i<l; i++ ) {
if (*(b+i)=='\n')
*(b+i) = 0;
}
}
int w_socket ( int s, const char* b ) {
write ( s, b, strlen(b) );
return 0;
}
int r_socket ( int s, char *b, size_t l ) {
int rd;
memset ( b, 0, l );
rd = read ( s, b, l );
if ( rd>0 )
strip_nl(b, l);
else
return -2;
if ( !strncmp( b, "ERR", 3 ) )
return -1;
return 0;
}
int wr_socket ( int s, char *b, size_t l ) {
int res;
pthread_mutex_lock(&socket_mutex);
w_socket ( s, b );
res = r_socket ( s, b, l );
pthread_mutex_unlock(&socket_mutex);
return res;
}
const char* pfb_id() {
return "socket";
}
int pfb_apiversion() {
return PFQ_API_VERSION;
}
const char* pfb_version() {
return "1.0";
}
struct pfb_conf_t *pfb_getconf() {
return &pfb_conf;
}
struct msg_t* msg_from_id(const char* mid) {
int i;
for ( i=0; i<NUMMSG_THREAD; i++ ) {
if ( !strncmp(ext_queue[i].id, mid, sizeof(ext_queue[i].id) ) )
return &ext_queue[i];
}
return NULL;
}
int pfb_init() {
pfb_conf.max_char = 200;
strcpy ( pfb_conf.command_path, "" );
strcpy ( pfb_conf.config_path, "" );
pfb_conf.port = 20000;
return PFBE_OK;
}
int pfb_setup( struct msg_t *qptr1, struct be_msg_t *qptr2 ) {
sock = socket(AF_INET, SOCK_STREAM, 0 );
if ( sock<0 )
return PFBE_UNUSABLE;
svr = gethostbyname ( pfb_conf.host );
if ( svr == NULL )
return PFBE_UNUSABLE;
memset ( &svra, 0, sizeof(svra) );
svra.sin_family = AF_INET;
memcpy ( (struct sockaddr*)&svra.sin_addr.s_addr,
(struct hostent*)svr->h_addr,
(struct hostent*)svr->h_length );
svra.sin_port = htons( pfb_conf.port );
if ( connect(sock, (struct sockaddr*)&svra, sizeof(svra)) <0 )
return PFBE_UNUSABLE;
ext_queue = qptr1;
my_queue = qptr2;
pthread_mutex_unlock(&socket_mutex);
return PFBE_OK;
}
int pfb_close() {
w_socket ( sock, "QUIT\n" );
shutdown ( sock, SHUT_RDWR );
return PFBE_OK;
}
int pfb_retr_headers( const char* msgid ) {
int res;
struct msg_t *msg;
msg = msg_from_id(msgid);
if ( msg && msg->hcached )
return PFBE_OK;
res = pfb_retr_to(msgid);
res|= pfb_retr_from(msgid);
res|= pfb_retr_subj(msgid);
res|= pfb_retr_path(msgid);
res = 0;
if ( res == PFBE_OK )
msg->hcached = 1;
else
msg->hcached = 0;
return PFBE_OK;
}
int pfb_retr_id( int n, char* b, size_t len ) {
char buf[BUF_SIZE];
int res;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s %d\n", CMD_MSGID, n );
res = wr_socket ( sock, buf, sizeof(buf) );
if ( res )
strncpy ( b, PFBE_SERROR, len );
else
strncpy ( b, buf+CMD_REPLY_LEN, len );
return PFBE_OK;
}
int pfb_retr_path( const char* msgid ) {
char buf[BUF_SIZE];
int res;
struct msg_t *msg;
msg = msg_from_id(msgid);
if ( !msg )
return PFBE_ERROR;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s %s\n", CMD_PATH, msgid );
res = wr_socket ( sock, buf, sizeof(buf) );
if ( res )
strcpy ( msg->path, PFBE_SERROR );
else
strcpy ( msg->path, buf+CMD_REPLY_LEN );
return PFBE_OK;
}
int pfb_retr_from( const char* msgid ) {
char buf[BUF_SIZE];
int res;
struct msg_t *msg;
msg = msg_from_id(msgid);
if ( !msg )
return PFBE_ERROR;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s %s\n", CMD_FROM, msgid );
res = wr_socket ( sock, buf, sizeof(buf) );
if ( res )
strcpy ( msg->from, PFBE_SERROR );
else
strcpy ( msg->from, buf+CMD_REPLY_LEN );
return PFBE_OK;
}
int pfb_retr_to( const char* msgid ) {
char buf[BUF_SIZE];
int res;
struct msg_t *msg;
msg = msg_from_id(msgid);
if ( !msg )
return PFBE_ERROR;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s %s\n", CMD_TO, msgid );
res = wr_socket ( sock, buf, sizeof(buf) );
if ( res )
strcpy ( msg->to, PFBE_SERROR );
else
strcpy ( msg->to, buf+CMD_REPLY_LEN );
return PFBE_OK;
}
int pfb_retr_subj( const char* msgid ) {
char buf[BUF_SIZE];
int res;
struct msg_t *msg;
msg = msg_from_id(msgid);
if ( !msg )
return PFBE_ERROR;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s %s\n", CMD_SUBJ, msgid );
res = wr_socket ( sock, buf, sizeof(buf) );
if ( res )
strcpy ( msg->subj, PFBE_SERROR );
else
strcpy ( msg->subj, buf+CMD_REPLY_LEN );
return PFBE_OK;
}
int pfb_retr_status ( const char* msgid ) {
char buf[BUF_SIZE];
int res;
struct msg_t *msg;
msg = msg_from_id(msgid);
if ( !msg )
return PFBE_ERROR;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s %s\n", CMD_STATUS, msgid );
res = wr_socket ( sock, buf, sizeof(buf) );
if ( res )
strcpy ( msg->stat, PFBE_SERROR );
else
strcpy ( msg->stat, buf+CMD_REPLY_LEN );
return PFBE_OK;
}
int pfb_retr_body( const char* msgid, char *buffer, size_t buflen ) {
int res;
struct msg_t *msg;
char *buf2;
buf2 = (char*)malloc(buflen);
msg = msg_from_id(msgid);
if ( !msg )
return PFBE_ERROR;
memset ( buf2, 0, buflen );
sprintf ( buf2, "%s %s\n", CMD_BODY, msgid );
res = wr_socket ( sock, buf2, buflen );
sprintf ( buffer, "%s\n", buf2+CMD_REPLY_LEN+7 );
free ( buf2 );
return strlen(buf2);
}
int pfb_num_msg() {
char buf[BUF_SIZE];
int res;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s\n", CMD_NUMMSG );
res = wr_socket ( sock, buf, sizeof(buf) );
if ( res )
return 0;
else
return ( atoi(buf+CMD_REPLY_LEN) );
}
int pfb_queue_count() {
char buf[BUF_SIZE];
int res;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s\n", CMD_NUMQ );
res = wr_socket ( sock, buf, sizeof(buf) );
printf ( "res: %d\n", res );
if ( res )
return 0;
else
return atoi(buf+CMD_REPLY_LEN);
}
char* pfb_queue_name(int q) {
static char buf[BUF_SIZE];
int res;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s %d\n", CMD_QNAME, q );
res = wr_socket ( sock, buf, sizeof(buf) );
if ( res )
return 0;
else
return buf+CMD_REPLY_LEN;
}
int pfb_fill_queue() {
char buf[255];
int i, j;
struct be_msg_t *msg;
j = pfb_num_msg();
for ( i=0; i<j; i++ ) {
msg = &my_queue[i];
pfb_retr_id ( i, buf, sizeof(buf) );
memcpy ( msg->id, buf, sizeof(msg->id) );
msg->changed = strncmp( msg->id, ext_queue[i].id, strlen(msg->id) );
}
NUMMSG_THREAD = j;
return j;
}
int pfb_action(int act, const char* msg) {
char b[BUF_SIZE];
char b2[BUF_SIZE];
switch ( act ) {
case MSG_DELETE:
sprintf ( b, CMD_MSGDEL );
break;
case MSG_HOLD:
sprintf ( b, CMD_MSGHOLD );
break;
case MSG_RELEASE:
sprintf ( b, CMD_MSGREL );
break;
case MSG_REQUEUE:
sprintf ( b, CMD_MSGREQ );
break;
default:
return 1;
}
sprintf ( b2, "%s %s\n", b, msg );
wr_socket ( sock, b2, sizeof(b2) );
return PFBE_OK;
}
int pfb_message_delete( const char* msg ) {
return pfb_action ( MSG_DELETE, msg );
}
int pfb_message_hold( const char* msg ) {
return pfb_action ( MSG_HOLD, msg );
}
int pfb_message_requeue( const char* msg ) {
return pfb_action ( MSG_REQUEUE, msg );
}
int pfb_message_release( const char* msg ) {
return pfb_action ( MSG_RELEASE, msg );
}
int pfb_set_queue ( int q ) {
char buf[BUF_SIZE];
int res;
memset ( buf, 0, sizeof(buf) );
sprintf ( buf, "%s %d\n", CMD_SETQ, q );
res = wr_socket ( sock, buf, sizeof(buf) );
return PFBE_OK;
}
void pfb_use_envelope ( int u ) {
pfb_using_envelope = u;
}
int pfb_get_caps() {
return pfb_caps;
}
syntax highlighted by Code2HTML, v. 0.9.1