#include "sdk_precomp.h"
#include "cbthreadpool.h"
#include "sdk_events.h"
#include "manager.h"
#include "messagemanager.h"
#include <wx/log.h>
#include <wx/listimpl.cpp>
WX_DEFINE_LIST(cbTaskList);
/// Base thread class
class PrivateThread : public wxThread
{
public:
enum State
{
Idle,
Busy
};
PrivateThread(cbThreadPool* pool)
: wxThread(wxTHREAD_JOINABLE),
m_pPool(pool),
m_Abort(false)
{
}
~PrivateThread(){}
void Abort(bool abort = true){ m_Abort = abort; }
virtual ExitCode Entry()
{
// continuous loop, until we abort
while (1)
{
if (TestDestroy())
break;
// wait for signal from pool
m_pPool->m_Semaphore.Wait();
// should we abort?
if (m_Abort)
break;
// this is our main iteration:
// if we have a task assigned, launch it
// else wait again for signal...
bool doneWork = false;
cbTaskElement elem;
m_pPool->GetNextElement(elem);
if (elem.task)
{
// increment the "busy" counter
m_pPool->m_CounterCriticalSection.Enter();
++m_pPool->m_Counter;
m_pPool->m_CounterCriticalSection.Leave();
elem.task->Execute();
doneWork = true;
// decrement the "busy" counter
m_pPool->m_CounterCriticalSection.Enter();
--m_pPool->m_Counter;
m_pPool->m_CounterCriticalSection.Leave();
}
if (elem.autoDelete)
delete elem.task;
if (doneWork)
{
// tell the pool we 're done
m_pPool->OnThreadTaskDone(this);
}
}
return 0;
}
cbThreadPool* m_pPool;
bool m_Abort;
};
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
cbThreadPool::cbThreadPool(wxEvtHandler* owner, int id, int concurrentThreads)
: m_pOwner(owner),
m_ID(id),
m_Done(true),
m_Batching(false),
m_Counter(0)
{
m_Threads.Clear();
SetConcurrentThreads(concurrentThreads);
}
cbThreadPool::~cbThreadPool()
{
FreeThreads();
}
void cbThreadPool::SetConcurrentThreads(int concurrentThreads)
{
// if == -1, means auto i.e. same as number of CPUs
if (concurrentThreads == -1)
m_ConcurrentThreads = wxThread::GetCPUCount();
else
m_ConcurrentThreads = concurrentThreads;
// if still == -1, something's wrong; reset to 1
if (m_ConcurrentThreads == -1)
m_ConcurrentThreads = 1;
LOGSTREAM << _T("Concurrent threads for pool set to ") << m_ConcurrentThreads << _T('\n');
// alloc (or dealloc) based on new thread count
AllocThreads();
}
// called by PrivateThread when it's done running a task
void cbThreadPool::OnThreadTaskDone(PrivateThread* thread)
{
m_CriticalSection.Enter();
// notify the owner that the task has ended
CodeBlocksEvent evt(cbEVT_THREADTASK_ENDED, m_ID);
wxPostEvent(m_pOwner, evt);
if (m_TaskQueue.IsEmpty())
{
// check no running threads are busy
m_CounterCriticalSection.Enter();
bool reallyDone = m_Counter == 0;
m_CounterCriticalSection.Leave();
if (reallyDone)
{
m_Done = true;
// notify the owner that all tasks are done
CodeBlocksEvent evt(cbEVT_THREADTASK_ALLDONE, m_ID);
wxPostEvent(m_pOwner, evt);
}
}
m_CriticalSection.Leave();
// make sure any waiting threads "wake-up"
m_Semaphore.Post();
}
void cbThreadPool::BatchBegin()
{
m_Batching = true;
}
void cbThreadPool::BatchEnd()
{
m_Batching = false;
// launch the thread (if there's room in the pool)
m_Semaphore.Post();
}
bool cbThreadPool::AddTask(cbThreadPoolTask* task, bool autoDelete)
{
// add task to the pool
cbTaskElement* elem = new cbTaskElement(task, autoDelete);
m_CriticalSection.Enter();
m_TaskQueue.Append(elem);
m_Done = false;
m_CriticalSection.Leave();
if (!m_Batching)
{
// launch the thread (if there's room in the pool)
m_Semaphore.Post();
}
return true;
}
// called by the threads
// picks the first waiting cbTaskElement and removes it from the queue
void cbThreadPool::GetNextElement(cbTaskElement& element)
{
m_CriticalSection.Enter();
cbTaskList::Node* node = m_TaskQueue.GetFirst();
if (node)
{
cbTaskElement* elem = node->GetData();
if (elem)
element = *elem;
m_TaskQueue.DeleteNode(node);
}
m_CriticalSection.Leave();
}
void cbThreadPool::AbortAllTasks()
{
ClearTaskQueue();
FreeThreads();
AllocThreads();
}
void cbThreadPool::ClearTaskQueue()
{
// delete all pending tasks set to autoDelete
m_CriticalSection.Enter();
for (cbTaskList::Node* node = m_TaskQueue.GetFirst(); node; node = node->GetNext())
{
cbTaskElement* elem = node->GetData();
if (elem->autoDelete)
delete elem->task;
}
m_TaskQueue.Clear();
m_CriticalSection.Leave();
}
void cbThreadPool::AllocThreads()
{
FreeThreads();
for (int i = 0; i < m_ConcurrentThreads; ++i)
{
PrivateThread* thr = new PrivateThread(this);
thr->Create(); // start the thread; it 'll wait for our signal ;)
thr->Run(); // start the thread; it 'll wait for our signal ;)
m_Threads.Add(thr);
}
}
void cbThreadPool::FreeThreads()
{
// delete allocated threads
unsigned int i;
for (i = 0; i < m_Threads.GetCount(); ++i)
{
PrivateThread* thread = m_Threads[i];
thread->Abort(); // set m_Abort on *every* thread first
}
for (i = 0; i < m_Threads.GetCount(); ++i)
m_Semaphore.Post(); // now it does not matter which thread wakes up
m_Semaphore.Post(); // let's do one extra, does not harm
// actually give them CPU time to die, too
#if wxCHECK_VERSION(2,6,0)
wxMilliSleep(1);
#else
wxUSleep(1);
#endif
wxLogNull logNo;
for (i = 0; i < m_Threads.GetCount(); ++i)
{
unsigned int count = 0;
PrivateThread* thread = m_Threads[i];
while(thread->IsRunning())
{
m_Semaphore.Post();
#if wxCHECK_VERSION(2,6,0)
wxMilliSleep(1);
#else
wxUSleep(1);
#endif
if(++count > 10)
break;
}
if(count > 10) // a bit brute, but if it did not wake up until now
thread->Kill(); // then it will never
}
m_Threads.Clear();
}
syntax highlighted by Code2HTML, v. 0.9.1