/* $Id: msg-thread.cpp,v 1.14 2006/03/30 14:16:34 adam Exp $
Copyright (c) 1998-2006, Index Data.
This file is part of the yazproxy.
YAZ proxy is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free
Software Foundation; either version 2, or (at your option) any later
version.
YAZ proxy is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
for more details.
You should have received a copy of the GNU General Public License
along with YAZ proxy; see the file LICENSE. If not, write to the
Free Software Foundation, 59 Temple Place - Suite 330, Boston, MA
02111-1307, USA.
*/
#if YAZ_POSIX_THREADS
#include <pthread.h>
#endif
#if HAVE_UNISTD_H
#include <unistd.h>
#endif
#include <ctype.h>
#include <stdio.h>
#include <yazpp/socket-observer.h>
#include <yaz/log.h>
#include "msg-thread.h"
using namespace yazpp_1;
class Msg_Thread::Private {
public:
int m_no_threads;
Msg_Thread_Queue m_input;
Msg_Thread_Queue m_output;
#if YAZ_POSIX_THREADS
int m_fd[2];
yazpp_1::ISocketObservable *m_SocketObservable;
pthread_t *m_thread_id;
pthread_mutex_t m_mutex_input_data;
pthread_cond_t m_cond_input_data;
pthread_mutex_t m_mutex_output_data;
bool m_stop_flag;
#endif
};
IMsg_Thread::~IMsg_Thread()
{
}
Msg_Thread_Queue::Msg_Thread_Queue()
{
m_list = 0;
}
int Msg_Thread_Queue::size()
{
int no = 0;
Msg_Thread_Queue_List *l;
for (l = m_list; l; l = l->m_next)
no++;
return no;
}
void Msg_Thread_Queue::enqueue(IMsg_Thread *m)
{
Msg_Thread_Queue_List *l = new Msg_Thread_Queue_List;
l->m_next = m_list;
l->m_item = m;
m_list = l;
}
IMsg_Thread *Msg_Thread_Queue::dequeue()
{
Msg_Thread_Queue_List **l = &m_list;
if (!*l)
return 0;
while ((*l)->m_next)
l = &(*l)->m_next;
IMsg_Thread *m = (*l)->m_item;
delete *l;
*l = 0;
return m;
}
#if YAZ_POSIX_THREADS
static void *tfunc(void *p)
{
Msg_Thread *pt = (Msg_Thread *) p;
pt->run(0);
return 0;
}
#endif
Msg_Thread::Msg_Thread(ISocketObservable *obs, int no_threads)
{
m_p = new Private;
#if YAZ_POSIX_THREADS
m_p->m_SocketObservable = obs;
pipe(m_p->m_fd);
obs->addObserver(m_p->m_fd[0], this);
obs->maskObserver(this, SOCKET_OBSERVE_READ);
m_p->m_stop_flag = false;
pthread_mutex_init(&m_p->m_mutex_input_data, 0);
pthread_cond_init(&m_p->m_cond_input_data, 0);
pthread_mutex_init(&m_p->m_mutex_output_data, 0);
m_p->m_no_threads = no_threads;
m_p->m_thread_id = new pthread_t[no_threads];
int i;
for (i = 0; i<m_p->m_no_threads; i++)
pthread_create(&m_p->m_thread_id[i], 0, tfunc, this);
#endif
}
Msg_Thread::~Msg_Thread()
{
#if YAZ_POSIX_THREADS
pthread_mutex_lock(&m_p->m_mutex_input_data);
m_p->m_stop_flag = true;
pthread_cond_broadcast(&m_p->m_cond_input_data);
pthread_mutex_unlock(&m_p->m_mutex_input_data);
int i;
for (i = 0; i<m_p->m_no_threads; i++)
pthread_join(m_p->m_thread_id[i], 0);
delete [] m_p->m_thread_id;
m_p->m_SocketObservable->deleteObserver(this);
pthread_cond_destroy(&m_p->m_cond_input_data);
pthread_mutex_destroy(&m_p->m_mutex_input_data);
pthread_mutex_destroy(&m_p->m_mutex_output_data);
close(m_p->m_fd[0]);
close(m_p->m_fd[1]);
#endif
delete m_p;
}
void Msg_Thread::socketNotify(int event)
{
#if YAZ_POSIX_THREADS
if (event & SOCKET_OBSERVE_READ)
{
char buf[2];
read(m_p->m_fd[0], buf, 1);
pthread_mutex_lock(&m_p->m_mutex_output_data);
IMsg_Thread *out = m_p->m_output.dequeue();
pthread_mutex_unlock(&m_p->m_mutex_output_data);
if (out)
out->result();
}
#endif
}
#if YAZ_POSIX_THREADS
void Msg_Thread::run(void *p)
{
while(1)
{
pthread_mutex_lock(&m_p->m_mutex_input_data);
while (!m_p->m_stop_flag && m_p->m_input.size() == 0)
pthread_cond_wait(&m_p->m_cond_input_data, &m_p->m_mutex_input_data);
if (m_p->m_stop_flag)
{
pthread_mutex_unlock(&m_p->m_mutex_input_data);
break;
}
IMsg_Thread *in = m_p->m_input.dequeue();
pthread_mutex_unlock(&m_p->m_mutex_input_data);
IMsg_Thread *out = in->handle();
pthread_mutex_lock(&m_p->m_mutex_output_data);
m_p->m_output.enqueue(out);
write(m_p->m_fd[1], "", 1);
pthread_mutex_unlock(&m_p->m_mutex_output_data);
}
}
#endif
void Msg_Thread::put(IMsg_Thread *m)
{
#if YAZ_POSIX_THREADS
pthread_mutex_lock(&m_p->m_mutex_input_data);
m_p->m_input.enqueue(m);
pthread_cond_signal(&m_p->m_cond_input_data);
pthread_mutex_unlock(&m_p->m_mutex_input_data);
#else
IMsg_Thread *out = m->handle();
if (out)
out->result();
#endif
}
/*
* Local variables:
* c-basic-offset: 4
* indent-tabs-mode: nil
* End:
* vim: shiftwidth=4 tabstop=8 expandtab
*/
syntax highlighted by Code2HTML, v. 0.9.1