/****************************************************************************
*
* Copyright (C) 2000-2001 RealNetworks, Inc. All rights reserved.
*
* This program is free software. It may be distributed under the terms
* in the file LICENSE, found in the top level of the source distribution.
*
*/
#include <string.h>
#ifdef _UNIX
#include <sys/types.h>
#include <sys/socket.h>
#endif
#include "thread.h"
#include "dbg.h"
#ifdef _UNIX
pthread_key_t g_keyself;
void* thread_start( void* pvoid )
{
CThread* pth;
pthread_setspecific( g_keyself, pvoid );
pth = (CThread*)pvoid;
if( pth->Init() )
{
pth->Run();
}
pth->m_retval = pth->Exit();
return NULL;
}
#endif
#ifdef _WIN32
DWORD g_tlsself;
DWORD WINAPI thread_start( LPVOID pvoid )
{
CThread* pth;
::TlsSetValue( g_tlsself, pvoid );
pth = (CThread*)pvoid;
if( pth->Init() )
{
pth->Run();
}
pth->m_retval = pth->Exit();
return 0;
}
#endif
/****************************************************************************
*
* CMutex
*
****************************************************************************/
CMutex::CMutex( void )
{
#ifdef _UNIX
// We could use PTHREAD_MUTEX_ERRORCHECK_NP for Linux debug builds
pthread_mutex_init( &m_mutex, NULL );
#endif
#ifdef _WIN32
m_mutex = ::CreateMutex( NULL, TRUE, NULL );
#endif
}
CMutex::~CMutex( void )
{
#ifdef _UNIX
pthread_mutex_destroy( &m_mutex );
#endif
#ifdef _WIN32
::CloseHandle( m_mutex );
#endif
}
void CMutex::Lock( void )
{
#ifdef _UNIX
pthread_mutex_lock( &m_mutex );
#endif
#ifdef _WIN32
::WaitForSingleObject( m_mutex, INFINITE );
#endif
}
void CMutex::Unlock( void )
{
#ifdef _UNIX
pthread_mutex_unlock( &m_mutex );
#endif
#ifdef _WIN32
::ReleaseMutex( m_mutex );
#endif
}
/****************************************************************************
*
* CSemaphore
*
****************************************************************************/
CSemaphore::CSemaphore( UINT nCount )
{
#ifdef _UNIX
// We could use PTHREAD_MUTEX_ERRORCHECK_NP for Linux debug builds
pthread_mutex_init( &m_mutex, NULL );
pthread_cond_init( &m_cond, NULL );
m_count = nCount;
#endif
#ifdef _WIN32
m_semaphore = ::CreateSemaphore( NULL, (LONG)nCount, MAXLONG, NULL );
#endif
}
CSemaphore::~CSemaphore( void )
{
#ifdef _UNIX
pthread_cond_destroy( &m_cond );
pthread_mutex_destroy( &m_mutex );
#endif
#ifdef _WIN32
::CloseHandle( m_semaphore );
#endif
}
void CSemaphore::Lock( void )
{
#ifdef _UNIX
pthread_mutex_lock( &m_mutex );
while( m_count == 0 )
{
pthread_cond_wait( &m_cond, &m_mutex );
}
m_count--;
pthread_mutex_unlock( &m_mutex );
#endif
#ifdef _WIN32
::WaitForSingleObject( m_semaphore, INFINITE );
#endif
}
void CSemaphore::Unlock( void )
{
#ifdef _UNIX
pthread_mutex_lock( &m_mutex );
m_count++;
pthread_mutex_unlock( &m_mutex );
pthread_cond_signal( &m_cond );
#endif
#ifdef _WIN32
::ReleaseSemaphore( m_semaphore, 1, NULL );
#endif
}
/****************************************************************************
*
* CThread
*
****************************************************************************/
CThread::CThread( void )
{
// Empty
}
CThread::~CThread( void )
{
// Empty
}
void CThread::Create( void )
{
#ifdef _UNIX
pthread_create( &m_thread, NULL, thread_start, this );
#endif
#ifdef _WIN32
DWORD dwThreadId = 0;
::CreateThread( NULL, 0, thread_start, this, 0, &dwThreadId );
#endif
}
bool CThread::Init( void )
{
return false;
}
int CThread::Exit( void )
{
return 0;
}
void CThread::Run( void )
{
}
CThread* CThread::This( void )
{
#ifdef _UNIX
return (CThread*)pthread_getspecific( g_keyself );
#endif
#ifdef _WIN32
return (CThread*)::TlsGetValue( g_tlsself );
#endif
}
/****************************************************************************
*
* CEventThread
*
****************************************************************************/
#define MIN_TIMER_ALLOC 16
#define MIN_STREAM_ALLOC 4
#define LEFT(n) (2*n)
#define RIGHT(n) (2*n+1)
#define PARENT(n) (n/2)
#define SWAP(t,a,b) { t=a; a=b; b=t; }
CEventThread::CEventThread( void ) :
m_nSocks(0),
m_nSockAlloc(0),
m_ppSocks(NULL),
m_pWaitObjs(NULL),
m_nTimers(0),
m_nTimerAlloc(0),
m_ppTimers(NULL)
{
m_nSockAlloc = MIN_STREAM_ALLOC;
m_ppSocks = new CSocket*[ MIN_STREAM_ALLOC ];
memset( m_ppSocks, 0, MIN_STREAM_ALLOC * sizeof(CStream*) );
m_pWaitObjs = new waitobj_t[ MIN_STREAM_ALLOC ];
memset( m_pWaitObjs, 0, MIN_STREAM_ALLOC * sizeof(waitobj_t) );
m_nTimerAlloc = MIN_TIMER_ALLOC;
m_ppTimers = new CTimer*[ MIN_TIMER_ALLOC ];
memset( m_ppTimers, 0, MIN_TIMER_ALLOC * sizeof(CTimer*) );
}
CEventThread::~CEventThread( void )
{
delete[] m_ppTimers;
delete[] m_pWaitObjs;
delete[] m_ppSocks;
}
void CEventThread::Run( void )
{
if( Init() )
{
bool running = true;
while( running )
{
// Figure out how long we can remain in poll/WFMO, possibly forever
waittimer_t nTimeout = INFTIM;
if( m_nTimers )
{
assert( m_ppTimers[1] );
nTimeout = m_ppTimers[1]->GetTimeout() - CTimer::CurrentTime();
if( nTimeout < 0 || nTimeout > 0x7FFFFFFF ) nTimeout = 0; // Wrap - it's late
}
#ifdef _UNIX
int rc = poll( m_pWaitObjs, m_nSocks, nTimeout );
if( rc < 0 )
{
dbgout( "poll() failed: error = %i (%s)", errno, strerror(errno) );
break;
}
#endif
#ifdef _WIN32
DWORD rc = ::WaitForMultipleObjects( m_nSocks, m_pWaitObjs, FALSE, nTimeout );
if( rc == WAIT_FAILED )
{
dbgout( "WFMO failed: error = %u", ::GetLastError() );
break;
}
rc = (rc == WAIT_TIMEOUT) ? 0 : 1;
#endif
if( rc == 0 && m_nTimers )
{
assert( m_nTimers && m_nTimerAlloc >= 2 && m_ppTimers[1] );
CTimer* pTimer = m_ppTimers[1];
if( pTimer->GetMode() == CTimer::Repeating )
{
pTimer->m_next += pTimer->m_interval;
}
else
{
pTimer->m_mode = CTimer::Disabled;
m_ppTimers[1] = m_ppTimers[m_nTimers--];
}
Heapify( CTimer::CurrentTime(), 1 );
pTimer->GetResponse()->OnTimer();
}
if( rc > 0 )
{
UINT n;
for( n = 0; n < m_nSocks; n++ )
{
assert( WAITOBJ_IS_VALID( m_pWaitObjs[n] ) && NULL != m_ppSocks[n] );
int err = SOCKERR_NONE;
waitevents_t wevt;
CSocket* pSock = m_ppSocks[n];
#ifdef _UNIX
wevt = m_pWaitObjs[n].revents;
if( (wevt & (POLLIN|POLLERR)) && (pSock->m_uSelectFlags & SF_ACCEPT) == SF_ACCEPT )
{
wevt = XPOLLACC;
}
if( (wevt & (POLLOUT|POLLERR)) && (pSock->m_uSelectFlags & SF_CONNECT) == SF_CONNECT )
{
socklen_t errlen = sizeof(err);
#if defined(_SOLARIS) && (_SOLARIS < 58)
getsockopt( pSock->GetHandle(), SOL_SOCKET, SO_ERROR, (char*)&err, &errlen );
#else
getsockopt( pSock->GetHandle(), SOL_SOCKET, SO_ERROR, &err, &errlen );
#endif
wevt = XPOLLCNX;
}
if( (wevt & POLLERR) )
{
wevt = pSock->m_uSelectFlags;
}
#endif
#ifdef _WIN32
::WSAEnumNetworkEvents( pSock->GetHandle(), m_pWaitObjs[n], &wevt );
if( wevt.lNetworkEvents & FD_CONNECT )
{
err = wevt.iErrorCode[FD_CONNECT_BIT];
}
if( wevt.lNetworkEvents & FD_CLOSE )
{
wevt.lNetworkEvents = FD_READ;
}
#endif
if( WAIT_EVENT_READ( wevt ) )
{
pSock->GetResponse()->OnReadReady();
}
else if( WAIT_EVENT_WRITE( wevt ) )
{
pSock->GetResponse()->OnWriteReady();
}
else if( WAIT_EVENT_ACCEPT( wevt ) )
{
CListenSocket* pListen = (CListenSocket*)pSock;
sockaddr_in sa;
socklen_t salen = sizeof(sa);
sockobj_t sock = accept( pListen->GetHandle(), (sockaddr*)&sa, &salen );
if( INVALID_SOCKET != sock )
{
#ifdef _WIN32
// Cancel selections (they are inherited in Win32)
::WSAEventSelect( sock, 0, 0 );
#endif
CTcpSocket* pNew = new CTcpSocket();
pNew->m_sock = sock;
pListen->m_pAcceptResponse->OnConnection( pNew );
}
}
else if( WAIT_EVENT_CONNECT( wevt ) )
{
pSock->GetResponse()->OnConnectDone( err );
}
else if( WAIT_EVENT_EXCEPT( wevt ) )
{
pSock->GetResponse()->OnExceptReady();
}
}
}
}
}
Exit();
}
bool CEventThread::Init( void )
{
return false;
}
int CEventThread::Exit( void )
{
return 0;
}
bool CEventThread::AddStream( CSocket* pSock )
{
assert_or_retv( false, pSock );
if( m_nSocks >= WAITOBJ_MAX )
{
return false;
}
if( m_nSocks == m_nSockAlloc )
{
UINT nNewAlloc = m_nSocks*2;
CSocket** ppSocks = new CSocket*[ nNewAlloc ];
memset( ppSocks, 0xDD, nNewAlloc * sizeof(CSocket*) );
memcpy( ppSocks, m_ppSocks, m_nSocks * sizeof(CSocket*) );
delete[] m_ppSocks;
m_ppSocks = ppSocks;
waitobj_t* pWaitObjs = new waitobj_t[ nNewAlloc ];
memset( pWaitObjs, 0xDD, nNewAlloc * sizeof(waitobj_t) );
memcpy( pWaitObjs, m_pWaitObjs, m_nSocks * sizeof(waitobj_t) );
delete[] m_pWaitObjs;
m_pWaitObjs = pWaitObjs;
m_nSockAlloc = nNewAlloc;
}
m_ppSocks[m_nSocks] = pSock;
#ifdef _UNIX
m_pWaitObjs[m_nSocks].fd = pSock->GetHandle();
m_pWaitObjs[m_nSocks].events = 0;
m_pWaitObjs[m_nSocks].revents = 0;
#endif
#ifdef _WIN32
m_pWaitObjs[m_nSocks] = ::CreateEvent( NULL, FALSE, FALSE, NULL );
#endif
m_nSocks++;
return true;
}
void CEventThread::DelStream( CSocket* pSock )
{
assert_or_ret( pSock );
UINT n;
for( n = 0; n < m_nSocks; n++ )
{
if( m_ppSocks[n] == pSock )
{
#ifdef _WIN32
::CloseHandle( m_pWaitObjs[n] );
#endif
m_nSocks--;
m_ppSocks[n] = m_ppSocks[m_nSocks];
m_pWaitObjs[n] = m_pWaitObjs[m_nSocks];
break;
}
}
if( m_nSocks <= m_nSockAlloc/4 && m_nSockAlloc > MIN_STREAM_ALLOC )
{
UINT nNewAlloc = m_nSockAlloc/2;
CSocket** ppSocks = new CSocket*[ nNewAlloc ];
memset( ppSocks, 0xDD, nNewAlloc * sizeof(CSocket*) );
memcpy( ppSocks, m_ppSocks, m_nSocks * sizeof(CSocket*) );
delete[] m_ppSocks;
m_ppSocks = ppSocks;
waitobj_t* pWaitObjs = new waitobj_t[ nNewAlloc ];
memset( pWaitObjs, 0xDD, nNewAlloc * sizeof(waitobj_t) );
memcpy( pWaitObjs, m_pWaitObjs, m_nSocks * sizeof(waitobj_t) );
delete[] m_pWaitObjs;
m_pWaitObjs = pWaitObjs;
m_nSockAlloc = nNewAlloc;
}
}
void CEventThread::SetStreamSelect( CSocket* pSock, UINT nWhich )
{
assert_or_ret( pSock && pSock->IsOpen() );
UINT n;
for( n = 0; n < m_nSocks; n++ )
{
if( m_ppSocks[n] == pSock )
{
#ifdef _UNIX
m_pWaitObjs[n].events = nWhich;
#endif
#ifdef _WIN32
nWhich |= FD_CLOSE;
::WSAEventSelect( pSock->GetHandle(), m_pWaitObjs[n], nWhich );
#endif
break;
}
}
}
// O( lg(n) )
void CEventThread::AddTimer( CTimer* pTimer )
{
assert_or_ret( pTimer );
if( 1+m_nTimers == m_nTimerAlloc )
{
UINT nNewAlloc = m_nTimerAlloc*2;
CTimer** ppTimers = new CTimer*[ nNewAlloc ];
memset( ppTimers, 0xDD, nNewAlloc * sizeof(CTimer*) );
memcpy( ppTimers, m_ppTimers, (1+m_nTimers) * sizeof(CTimer*) );
delete[] m_ppTimers;
m_nTimerAlloc = nNewAlloc;
m_ppTimers = ppTimers;
}
m_nTimers++;
UINT n = m_nTimers;
UINT32 t = pTimer->GetTimeout();
while( n > 1 && t < m_ppTimers[ PARENT(n) ]->GetTimeout() )
{
m_ppTimers[n] = m_ppTimers[ PARENT(n) ];
n = PARENT(n);
}
m_ppTimers[n] = pTimer;
}
// O( n )
void CEventThread::DelTimer( CTimer* pTimer )
{
assert_or_ret( pTimer );
UINT n;
for( n = 1; n <= m_nTimers; n++ )
{
if( m_ppTimers[n] == pTimer )
{
m_ppTimers[n] = m_ppTimers[m_nTimers];
m_nTimers--;
Heapify( CTimer::CurrentTime(), n );
break;
}
}
// Shrink heap if we are using less than 1/4 and more than minimum
if( m_nTimers <= m_nTimerAlloc/4 && m_nTimerAlloc > MIN_TIMER_ALLOC )
{
UINT nNewAlloc = m_nTimerAlloc/2;
CTimer** ppTimers = new CTimer*[ nNewAlloc ];
memset( ppTimers, 0xDD, nNewAlloc * sizeof(CTimer*) );
memcpy( ppTimers, m_ppTimers, (1+m_nTimers) * sizeof(CTimer*) );
delete[] m_ppTimers;
m_nTimerAlloc = nNewAlloc;
m_ppTimers = ppTimers;
}
}
void CEventThread::Heapify( UINT32 now, UINT n )
{
UINT lnode = LEFT(n);
UINT rnode = RIGHT(n);
UINT low = n;
UINT32 dn, dl, dr;
dn = m_ppTimers[n]->GetTimeout() - now;
if( dn > 0x7FFFFFFF ) dn = 0;
if( lnode <= m_nTimers )
{
dl = m_ppTimers[lnode]->GetTimeout() - now;
if( dl > 0x7FFFFFFF ) dl = 0;
if( dl < dn )
{
low = lnode;
dn = dl;
}
}
if( rnode <= m_nTimers )
{
dr = m_ppTimers[rnode]->GetTimeout() - now;
if( dr > 0x7FFFFFFF ) dr = 0;
if( dr < dn )
{
low = rnode;
dn = dr;
}
}
if( low != n )
{
CTimer* tmp;
SWAP( tmp, m_ppTimers[n], m_ppTimers[low] );
Heapify( now, low );
}
}
syntax highlighted by Code2HTML, v. 0.9.1