#include #include #include #include #include #include #include #include #include #include #include #include #include "pfq_backend.h" #include "pfq_service.h" #include "../pfqmessage.h" #include "../config.h" #include "../pfqtcp.h" #define PF_VERSION_20 1 #define PF_VERSION_21 2 #define PF_VERSION_22 3 pthread_mutex_t socket_mutex = PTHREAD_MUTEX_INITIALIZER; int CURQ; int NUMMSG_THREAD; struct sockaddr_in svra; struct hostent *svr; int sock; struct pfb_conf_t pfb_conf; void strip_nl(char* b, int l) { int i; for ( i=0; i0 ) strip_nl(b, l); else return -2; if ( !strncmp( b, "ERR", 3 ) ) return -1; return 0; } int wr_socket ( int s, char *b, size_t l ) { int res; pthread_mutex_lock(&socket_mutex); w_socket ( s, b ); res = r_socket ( s, b, l ); pthread_mutex_unlock(&socket_mutex); return res; } const char* pfb_id() { return "socket"; } int pfb_apiversion() { return PFQ_API_VERSION; } const char* pfb_version() { return "1.0"; } struct pfb_conf_t *pfb_getconf() { return &pfb_conf; } struct msg_t* msg_from_id(const char* mid) { int i; for ( i=0; ih_addr, (struct hostent*)svr->h_length ); svra.sin_port = htons( pfb_conf.port ); if ( connect(sock, (struct sockaddr*)&svra, sizeof(svra)) <0 ) return PFBE_UNUSABLE; ext_queue = qptr1; my_queue = qptr2; pthread_mutex_unlock(&socket_mutex); return PFBE_OK; } int pfb_close() { w_socket ( sock, "QUIT\n" ); shutdown ( sock, SHUT_RDWR ); return PFBE_OK; } int pfb_retr_headers( const char* msgid ) { int res; struct msg_t *msg; msg = msg_from_id(msgid); if ( msg && msg->hcached ) return PFBE_OK; res = pfb_retr_to(msgid); res|= pfb_retr_from(msgid); res|= pfb_retr_subj(msgid); res|= pfb_retr_path(msgid); res = 0; if ( res == PFBE_OK ) msg->hcached = 1; else msg->hcached = 0; return PFBE_OK; } int pfb_retr_id( int n, char* b, size_t len ) { char buf[BUF_SIZE]; int res; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s %d\n", CMD_MSGID, n ); res = wr_socket ( sock, buf, sizeof(buf) ); if ( res ) strncpy ( b, PFBE_SERROR, len ); else strncpy ( b, buf+CMD_REPLY_LEN, len ); return PFBE_OK; } int pfb_retr_path( const char* msgid ) { char buf[BUF_SIZE]; int res; struct msg_t *msg; msg = msg_from_id(msgid); if ( !msg ) return PFBE_ERROR; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s %s\n", CMD_PATH, msgid ); res = wr_socket ( sock, buf, sizeof(buf) ); if ( res ) strcpy ( msg->path, PFBE_SERROR ); else strcpy ( msg->path, buf+CMD_REPLY_LEN ); return PFBE_OK; } int pfb_retr_from( const char* msgid ) { char buf[BUF_SIZE]; int res; struct msg_t *msg; msg = msg_from_id(msgid); if ( !msg ) return PFBE_ERROR; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s %s\n", CMD_FROM, msgid ); res = wr_socket ( sock, buf, sizeof(buf) ); if ( res ) strcpy ( msg->from, PFBE_SERROR ); else strcpy ( msg->from, buf+CMD_REPLY_LEN ); return PFBE_OK; } int pfb_retr_to( const char* msgid ) { char buf[BUF_SIZE]; int res; struct msg_t *msg; msg = msg_from_id(msgid); if ( !msg ) return PFBE_ERROR; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s %s\n", CMD_TO, msgid ); res = wr_socket ( sock, buf, sizeof(buf) ); if ( res ) strcpy ( msg->to, PFBE_SERROR ); else strcpy ( msg->to, buf+CMD_REPLY_LEN ); return PFBE_OK; } int pfb_retr_subj( const char* msgid ) { char buf[BUF_SIZE]; int res; struct msg_t *msg; msg = msg_from_id(msgid); if ( !msg ) return PFBE_ERROR; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s %s\n", CMD_SUBJ, msgid ); res = wr_socket ( sock, buf, sizeof(buf) ); if ( res ) strcpy ( msg->subj, PFBE_SERROR ); else strcpy ( msg->subj, buf+CMD_REPLY_LEN ); return PFBE_OK; } int pfb_retr_status ( const char* msgid ) { char buf[BUF_SIZE]; int res; struct msg_t *msg; msg = msg_from_id(msgid); if ( !msg ) return PFBE_ERROR; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s %s\n", CMD_STATUS, msgid ); res = wr_socket ( sock, buf, sizeof(buf) ); if ( res ) strcpy ( msg->stat, PFBE_SERROR ); else strcpy ( msg->stat, buf+CMD_REPLY_LEN ); return PFBE_OK; } int pfb_retr_body( const char* msgid, char *buffer, size_t buflen ) { int res; struct msg_t *msg; char *buf2; buf2 = (char*)malloc(buflen); msg = msg_from_id(msgid); if ( !msg ) return PFBE_ERROR; memset ( buf2, 0, buflen ); sprintf ( buf2, "%s %s\n", CMD_BODY, msgid ); res = wr_socket ( sock, buf2, buflen ); sprintf ( buffer, "%s\n", buf2+CMD_REPLY_LEN+7 ); free ( buf2 ); return strlen(buf2); } int pfb_num_msg() { char buf[BUF_SIZE]; int res; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s\n", CMD_NUMMSG ); res = wr_socket ( sock, buf, sizeof(buf) ); if ( res ) return 0; else return ( atoi(buf+CMD_REPLY_LEN) ); } int pfb_queue_count() { char buf[BUF_SIZE]; int res; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s\n", CMD_NUMQ ); res = wr_socket ( sock, buf, sizeof(buf) ); printf ( "res: %d\n", res ); if ( res ) return 0; else return atoi(buf+CMD_REPLY_LEN); } char* pfb_queue_name(int q) { static char buf[BUF_SIZE]; int res; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s %d\n", CMD_QNAME, q ); res = wr_socket ( sock, buf, sizeof(buf) ); if ( res ) return 0; else return buf+CMD_REPLY_LEN; } int pfb_fill_queue() { char buf[255]; int i, j; struct be_msg_t *msg; j = pfb_num_msg(); for ( i=0; iid, buf, sizeof(msg->id) ); msg->changed = strncmp( msg->id, ext_queue[i].id, strlen(msg->id) ); } NUMMSG_THREAD = j; return j; } int pfb_action(int act, const char* msg) { char b[BUF_SIZE]; char b2[BUF_SIZE]; switch ( act ) { case MSG_DELETE: sprintf ( b, CMD_MSGDEL ); break; case MSG_HOLD: sprintf ( b, CMD_MSGHOLD ); break; case MSG_RELEASE: sprintf ( b, CMD_MSGREL ); break; case MSG_REQUEUE: sprintf ( b, CMD_MSGREQ ); break; default: return 1; } sprintf ( b2, "%s %s\n", b, msg ); wr_socket ( sock, b2, sizeof(b2) ); return PFBE_OK; } int pfb_message_delete( const char* msg ) { return pfb_action ( MSG_DELETE, msg ); } int pfb_message_hold( const char* msg ) { return pfb_action ( MSG_HOLD, msg ); } int pfb_message_requeue( const char* msg ) { return pfb_action ( MSG_REQUEUE, msg ); } int pfb_message_release( const char* msg ) { return pfb_action ( MSG_RELEASE, msg ); } int pfb_set_queue ( int q ) { char buf[BUF_SIZE]; int res; memset ( buf, 0, sizeof(buf) ); sprintf ( buf, "%s %d\n", CMD_SETQ, q ); res = wr_socket ( sock, buf, sizeof(buf) ); return PFBE_OK; } void pfb_use_envelope ( int u ) { pfb_using_envelope = u; } int pfb_get_caps() { return pfb_caps; }