///////////////////////////////////////////////////////////////////////////////
// MQ4CPP - Message queuing for C++
// Copyright (C) 2004-2007 Riccardo Pompeo (Italy)
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
//
#define SILENT
#include "Trace.h"
#include "Thread.h"
#ifdef WIN32
const int Thread::P_ABOVE_NORMAL = THREAD_PRIORITY_ABOVE_NORMAL;
const int Thread::P_BELOW_NORMAL = THREAD_PRIORITY_BELOW_NORMAL;
const int Thread::P_HIGHEST = THREAD_PRIORITY_HIGHEST;
const int Thread::P_IDLE = THREAD_PRIORITY_IDLE;
const int Thread::P_LOWEST = THREAD_PRIORITY_LOWEST;
const int Thread::P_NORMAL = THREAD_PRIORITY_NORMAL;
const int Thread::P_CRITICAL = THREAD_PRIORITY_TIME_CRITICAL;
#define THREAD_NULL NULL
#define ASSIGN_LONG(dest,val) InterlockedExchange(&dest,val)
#define ASSIGN_BOOL(dest,val) InterlockedExchange(&dest,(LONG)val)
#define ASSIGN_PTR(dest,val) InterlockedExchangePointer(&dest,val)
#else
#include <sys/time.h>
const int Thread::P_ABOVE_NORMAL = 0;
const int Thread::P_BELOW_NORMAL = 1;
const int Thread::P_HIGHEST = 2;
const int Thread::P_IDLE = 3;
const int Thread::P_LOWEST = 4;
const int Thread::P_NORMAL = 5;
const int Thread::P_CRITICAL = 6;
#define THREAD_NULL 0
#define ASSIGN_LONG(dest,val) dest=val
#define ASSIGN_BOOL(dest,val) dest=val
#define ASSIGN_PTR(dest,val) dest=val
#endif
#define SUSPENDWAITMS 10
bool Thread::itsShutdownInProgress=false;
void Thread::running()
{
ASSIGN_BOOL(itsRunningFlag,true);
};
bool Thread::isRunning()
{
return (itsRunningFlag!=0) ? true : false;
};
bool Thread::isSuspended()
{
return (itsSuspendedFlag!=0) ? true : false;
};
void Thread::shutdownInProgress()
{
TRACE("-----------------------SHUTDOWN-----------------------")
itsShutdownInProgress=true;
}
unsigned long Thread::threadID()
{
#ifdef WIN32
return GetCurrentThreadId();
#else
return (unsigned long)pthread_self();
#endif
}
/** Thread(const char* nm)
* overloaded constructor
* creates a Thread object identified by "nm"
**/
Thread::Thread(const char* nm)
{
TRACE("Thread constructor - start")
TRACE("Name=" << nm)
m_hThread = THREAD_NULL;
m_strName = nm;
ASSIGN_BOOL(itsRunningFlag,false);
ASSIGN_BOOL(itsSuspendedFlag,false);
ASSIGN_LONG(itsWorkingThreadID,0);
#ifdef WIN32
InitializeCriticalSection(&m_hMutex);
#else
pthread_mutex_init(&m_hMutex,NULL);
#endif
TRACE("Thread constructor - end")
}
Thread::~Thread()
{
TRACE("Thread destructor - start")
TRACE("Name=" << getName())
if(m_hThread != THREAD_NULL)
{
stop(true);
}
#ifdef WIN32
DeleteCriticalSection(&m_hMutex);
#else
pthread_mutex_destroy(&m_hMutex);
#endif
TRACE("Thread destructor - end")
}
/** getName()
* return the Thread object's name as a string
**/
const char* Thread::getName() const
{
return m_strName.c_str();
}
bool Thread::is(const char* theName)
{
return (m_strName==theName);
}
/** run()
* called by the thread callback _ou_thread_proc()
* to be overridden by child classes of Thread
**/
void Thread::run()
{
// Base run
}
/** sleep(long ms)
* holds back the thread's execution for
* "ms" milliseconds
**/
void Thread::sleep(long ms)
{
TRACE("Thread::sleep(static) - start")
#ifdef WIN32
Sleep(ms);
#else
struct timespec abs_ts;
struct timespec rm_ts;
rm_ts.tv_sec = ms/1000;
rm_ts.tv_nsec = ms%1000 *1000000;
do
{
abs_ts.tv_sec = rm_ts.tv_sec;
abs_ts.tv_nsec = rm_ts.tv_nsec;
} while(nanosleep(&abs_ts,&rm_ts) < 0);
#endif
TRACE("Thread::sleep(static) - stop")
}
/** start()
* creates a low-level thread object and calls the
* run() function
**/
void Thread::start()
{
TRACE("Thread::start - start")
TRACE("Name=" << getName())
#ifdef WIN32
DWORD tid = 0;
m_hThread = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)_ou_thread_proc,(Thread*)this,0,&tid);
if(m_hThread == NULL)
{
TRACE("Fail to create thread")
throw ThreadException(string("Failed to create thread ->")+m_strName);
}
else
{
setPriority(Thread::P_NORMAL);
}
#else
pthread_mutex_init(&m_hSuspendMutex,NULL);
pthread_cond_init(&m_SuspendCondition,NULL);
int iret = pthread_create( &m_hThread, NULL, _ou_thread_proc,this);
if(iret!=0)
{
TRACE("Fail to create thread")
throw ThreadException(string("Failed to create thread ->")+m_strName);
}
#endif
TRACE("Thread::start - end")
}
/** stop()
* stops the running thread and frees the thread handle
**/
void Thread::stop(bool cancel)
{
TRACE("Thread::stop - start")
TRACE("Name=" << getName())
if(itsRunningFlag)
{
ASSIGN_BOOL(itsRunningFlag,false);
#ifdef WIN32
if(isSuspended()==true)
resume();
if(cancel==true)
TerminateThread(m_hThread,NULL);
else
WaitForSingleObject(m_hThread,INFINITE);
CloseHandle(m_hThread);
#else
TRACE("Joining thread")
if(cancel==true)
pthread_cancel(m_hThread);
else
pthread_join(m_hThread,NULL);
TRACE("Thread cleanup")
pthread_mutex_destroy(&m_hSuspendMutex);
pthread_cond_destroy(&m_SuspendCondition);
#endif
m_hThread = THREAD_NULL;
}
TRACE("Thread::stop - end")
}
/** setPriority(int tp)
* sets the priority of the thread to "tp"
* "tp" must be a valid priority defined in the
* Thread class
**/
void Thread::setPriority(int tp)
{
if(m_hThread == THREAD_NULL)
{
throw ThreadException("Thread object is null");
}
else
{
#ifdef WIN32
if(SetThreadPriority(m_hThread,tp) == 0)
{
throw ThreadException("Failed to set priority");
}
#endif
}
}
void Thread::setAffinity(unsigned cpu)
{
#ifdef WIN32
DWORD mask=1;
mask <<= cpu;
if(SetThreadAffinityMask(m_hThread,mask)==0)
throw ThreadException("Failed to set affinity");
#else
#ifdef HAVE_PTHREAD_SETAFFINITY_NP
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(cpu, &cpuset);
#ifndef P2_PTHREAD_SETAFFINITY
if(pthread_setaffinity_np(m_hThread, sizeof(cpuset), &cpuset)!=0)
throw ThreadException("Failed to set affinity");
#else
if(pthread_setaffinity_np(m_hThread, &cpuset)!=0)
throw ThreadException("Failed to set affinity");
#endif
#else
DISPLAY("Thread affinity not supported on this system")
#endif
#endif
sleep(0);
}
/** suspend()
* suspends the thread
**/
void Thread::suspend()
{
TRACE("Thread::suspend - start")
TRACE("Name=" << getName())
if(m_hThread == THREAD_NULL)
{
throw ThreadException(string("Thread object is null ->")+m_strName);
}
else
{
#ifdef WIN32
ASSIGN_BOOL(itsSuspendedFlag,true);
if(SuspendThread(m_hThread) < 0)
{
TRACE("Failed to suspend thread")
throw ThreadException(string("Failed to suspend thread ->")+m_strName);
}
#else
TRACE("Tread suspended")
pthread_mutex_lock(&m_hSuspendMutex);
TRACE("Cond mutex count=" << m_hSuspendMutex.__m_count)
TRACE("Cond lock status=" << m_hSuspendMutex.__m_lock.__status)
itsSuspendedFlag=true;
while(itsSuspendedFlag==true)
{
int ms=SUSPENDWAITMS;
struct timespec abs_ts;
struct timeval cur_tv;
gettimeofday(&cur_tv, NULL);
abs_ts.tv_sec = cur_tv.tv_sec + ms/1000;
abs_ts.tv_nsec = (cur_tv.tv_usec + ms%1000*1000)*1000;
// FIX by Steve Rodgers
if(abs_ts.tv_nsec > 999999999)
{
abs_ts.tv_sec++;
abs_ts.tv_nsec -= 1000000000;
}
// End fix
pthread_cond_timedwait(&m_SuspendCondition,&m_hSuspendMutex,&abs_ts);
TRACE("Cond condition lock status=" << m_SuspendCondition.__c_lock.__status)
if(itsRunningFlag==false)
{
TRACE("Resume thread for shutdown cleanup")
break;
}
}
pthread_mutex_unlock(&m_hSuspendMutex);
TRACE("Cond mutex count=" << m_hSuspendMutex.__m_count)
TRACE("Cond lock status=" << m_hSuspendMutex.__m_lock.__status)
TRACE("Thread resumed")
#endif
}
TRACE("Thread::suspend - end")
}
/** resume()
* resumes a suspended thread
**/
void Thread::resume()
{
TRACE("Thread::resume - start")
TRACE("Name=" << getName())
if(m_hThread == THREAD_NULL)
{
throw ThreadException(string("Thread object is null ->")+m_strName);
}
else
{
#ifdef WIN32
if(ResumeThread(m_hThread) < 0)
{
TRACE("Failed to resume thread")
throw ThreadException(string("Failed to resume thread ->")+m_strName);
}
else
{
ASSIGN_BOOL(itsSuspendedFlag,false);
}
#else
pthread_mutex_lock(&m_hSuspendMutex);
TRACE("Cond mutex count=" << m_hSuspendMutex.__m_count)
TRACE("Cond lock status=" << m_hSuspendMutex.__m_lock.__status)
itsSuspendedFlag=false;
pthread_cond_signal(&m_SuspendCondition);
TRACE("Cond condition lock status=" << m_SuspendCondition.__c_lock.__status)
pthread_mutex_unlock(&m_hSuspendMutex);
TRACE("Cond mutex count=" << m_hSuspendMutex.__m_count)
TRACE("Cond lock status=" << m_hSuspendMutex.__m_lock.__status)
#endif
}
TRACE("Thread::resume - end")
}
/** wait(long ms)
* makes the thread suspend execution until the
* mutex is released by another thread.
* "ms" specifies a time-out for the wait operation.
* "ms" defaults to 5000 milli-seconds
**/
bool Thread::wait(long ms)
{
TRACE("Thread::wait - start")
TRACE("Name=" << getName())
if(itsWorkingThreadID!=0)
{
TRACE(getName() << ": thread " << threadID() << " is waiting for thread " << itsWorkingThreadID)
}
#ifdef WIN32
Sleep(0);
EnterCriticalSection(&m_hMutex);
ASSIGN_LONG(itsWorkingThreadID,threadID());
#else
struct timespec abs_ts;
struct timeval cur_tv;
gettimeofday(&cur_tv, NULL);
abs_ts.tv_sec = cur_tv.tv_sec + ms/1000;
abs_ts.tv_nsec = (cur_tv.tv_usec + ms%1000*1000)*1000;
int res=pthread_mutex_timedlock(&m_hMutex,&abs_ts);
TRACE("Mutex count=" << m_hMutex.__m_count)
TRACE("Lock status=" << m_hMutex.__m_lock.__status)
switch(res)
{
case 0:
itsWorkingThreadID=threadID(); //++ v1.5
TRACE("Thread::wait - end")
return true;
case EINVAL:
throw ThreadException(string("pthread_mutex_timedlock: Invalid value ->")+m_strName);
break;
case ETIMEDOUT:
throw ThreadException(string("pthread_mutex_timedlock: Wait timed out ->")+m_strName);
break;
default:
throw ThreadException(string("pthread_mutex_timedlock: Unexpected value ->")+m_strName);
}
#endif
return false;
}
/** release()
* releases the mutex "m" and makes it
* available for other threads
**/
void Thread::release()
{
TRACE("Thread::release - start")
TRACE("Name=" << getName())
#ifdef WIN32
ASSIGN_LONG(itsWorkingThreadID,0L);
LeaveCriticalSection(&m_hMutex);
#else
pthread_mutex_unlock(&m_hMutex);
TRACE("Mutex count=" << m_hMutex.__m_count)
TRACE("Lock status=" << m_hMutex.__m_lock.__status)
#endif
TRACE("Thread::release - end")
}
// ThreadException
ThreadException::ThreadException(const char* m)
{
msg = m;
}
ThreadException::ThreadException(string m)
{
msg = m;
}
string ThreadException::getMessage() const
{
return msg;
}
// global thread callback
#ifdef WIN32
unsigned int _ou_thread_proc(void* param)
{
TRACE("Start _ou_thread_proc")
try
{
Thread* tp = (Thread*)param;
tp->running();
TRACE("Thread " << tp->getName() << "started")
tp->run();
}
catch(...)
{
DISPLAY("Unhandled exception in thread callback")
}
TRACE("End _ou_thread_proc")
return 0;
}
#else
void* _ou_thread_proc(void* param)
{
TRACE("Start _ou_thread_proc")
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS,NULL);
try
{
Thread* tp = (Thread*)param;
tp->running();
TRACE("Thread " << tp->getName() << " started")
tp->run();
}
catch(...)
{
DISPLAY("Unhandled exception in thread callback")
}
TRACE("End _ou_thread_proc")
pthread_exit(NULL);
return NULL;
}
#endif
syntax highlighted by Code2HTML, v. 0.9.1