/**************************************************************************** * * Copyright (C) 2000-2001 RealNetworks, Inc. All rights reserved. * * This program is free software. It may be distributed under the terms * in the file LICENSE, found in the top level of the source distribution. * */ #include #ifdef _UNIX #include #include #endif #include "thread.h" #include "dbg.h" #ifdef _UNIX pthread_key_t g_keyself; void* thread_start( void* pvoid ) { CThread* pth; pthread_setspecific( g_keyself, pvoid ); pth = (CThread*)pvoid; if( pth->Init() ) { pth->Run(); } pth->m_retval = pth->Exit(); return NULL; } #endif #ifdef _WIN32 DWORD g_tlsself; DWORD WINAPI thread_start( LPVOID pvoid ) { CThread* pth; ::TlsSetValue( g_tlsself, pvoid ); pth = (CThread*)pvoid; if( pth->Init() ) { pth->Run(); } pth->m_retval = pth->Exit(); return 0; } #endif /**************************************************************************** * * CMutex * ****************************************************************************/ CMutex::CMutex( void ) { #ifdef _UNIX // We could use PTHREAD_MUTEX_ERRORCHECK_NP for Linux debug builds pthread_mutex_init( &m_mutex, NULL ); #endif #ifdef _WIN32 m_mutex = ::CreateMutex( NULL, TRUE, NULL ); #endif } CMutex::~CMutex( void ) { #ifdef _UNIX pthread_mutex_destroy( &m_mutex ); #endif #ifdef _WIN32 ::CloseHandle( m_mutex ); #endif } void CMutex::Lock( void ) { #ifdef _UNIX pthread_mutex_lock( &m_mutex ); #endif #ifdef _WIN32 ::WaitForSingleObject( m_mutex, INFINITE ); #endif } void CMutex::Unlock( void ) { #ifdef _UNIX pthread_mutex_unlock( &m_mutex ); #endif #ifdef _WIN32 ::ReleaseMutex( m_mutex ); #endif } /**************************************************************************** * * CSemaphore * ****************************************************************************/ CSemaphore::CSemaphore( UINT nCount ) { #ifdef _UNIX // We could use PTHREAD_MUTEX_ERRORCHECK_NP for Linux debug builds pthread_mutex_init( &m_mutex, NULL ); pthread_cond_init( &m_cond, NULL ); m_count = nCount; #endif #ifdef _WIN32 m_semaphore = ::CreateSemaphore( NULL, (LONG)nCount, MAXLONG, NULL ); #endif } CSemaphore::~CSemaphore( void ) { #ifdef _UNIX pthread_cond_destroy( &m_cond ); pthread_mutex_destroy( &m_mutex ); #endif #ifdef _WIN32 ::CloseHandle( m_semaphore ); #endif } void CSemaphore::Lock( void ) { #ifdef _UNIX pthread_mutex_lock( &m_mutex ); while( m_count == 0 ) { pthread_cond_wait( &m_cond, &m_mutex ); } m_count--; pthread_mutex_unlock( &m_mutex ); #endif #ifdef _WIN32 ::WaitForSingleObject( m_semaphore, INFINITE ); #endif } void CSemaphore::Unlock( void ) { #ifdef _UNIX pthread_mutex_lock( &m_mutex ); m_count++; pthread_mutex_unlock( &m_mutex ); pthread_cond_signal( &m_cond ); #endif #ifdef _WIN32 ::ReleaseSemaphore( m_semaphore, 1, NULL ); #endif } /**************************************************************************** * * CThread * ****************************************************************************/ CThread::CThread( void ) { // Empty } CThread::~CThread( void ) { // Empty } void CThread::Create( void ) { #ifdef _UNIX pthread_create( &m_thread, NULL, thread_start, this ); #endif #ifdef _WIN32 DWORD dwThreadId = 0; ::CreateThread( NULL, 0, thread_start, this, 0, &dwThreadId ); #endif } bool CThread::Init( void ) { return false; } int CThread::Exit( void ) { return 0; } void CThread::Run( void ) { } CThread* CThread::This( void ) { #ifdef _UNIX return (CThread*)pthread_getspecific( g_keyself ); #endif #ifdef _WIN32 return (CThread*)::TlsGetValue( g_tlsself ); #endif } /**************************************************************************** * * CEventThread * ****************************************************************************/ #define MIN_TIMER_ALLOC 16 #define MIN_STREAM_ALLOC 4 #define LEFT(n) (2*n) #define RIGHT(n) (2*n+1) #define PARENT(n) (n/2) #define SWAP(t,a,b) { t=a; a=b; b=t; } CEventThread::CEventThread( void ) : m_nSocks(0), m_nSockAlloc(0), m_ppSocks(NULL), m_pWaitObjs(NULL), m_nTimers(0), m_nTimerAlloc(0), m_ppTimers(NULL) { m_nSockAlloc = MIN_STREAM_ALLOC; m_ppSocks = new CSocket*[ MIN_STREAM_ALLOC ]; memset( m_ppSocks, 0, MIN_STREAM_ALLOC * sizeof(CStream*) ); m_pWaitObjs = new waitobj_t[ MIN_STREAM_ALLOC ]; memset( m_pWaitObjs, 0, MIN_STREAM_ALLOC * sizeof(waitobj_t) ); m_nTimerAlloc = MIN_TIMER_ALLOC; m_ppTimers = new CTimer*[ MIN_TIMER_ALLOC ]; memset( m_ppTimers, 0, MIN_TIMER_ALLOC * sizeof(CTimer*) ); } CEventThread::~CEventThread( void ) { delete[] m_ppTimers; delete[] m_pWaitObjs; delete[] m_ppSocks; } void CEventThread::Run( void ) { if( Init() ) { bool running = true; while( running ) { // Figure out how long we can remain in poll/WFMO, possibly forever waittimer_t nTimeout = INFTIM; if( m_nTimers ) { assert( m_ppTimers[1] ); nTimeout = m_ppTimers[1]->GetTimeout() - CTimer::CurrentTime(); if( nTimeout < 0 || nTimeout > 0x7FFFFFFF ) nTimeout = 0; // Wrap - it's late } #ifdef _UNIX int rc = poll( m_pWaitObjs, m_nSocks, nTimeout ); if( rc < 0 ) { dbgout( "poll() failed: error = %i (%s)", errno, strerror(errno) ); break; } #endif #ifdef _WIN32 DWORD rc = ::WaitForMultipleObjects( m_nSocks, m_pWaitObjs, FALSE, nTimeout ); if( rc == WAIT_FAILED ) { dbgout( "WFMO failed: error = %u", ::GetLastError() ); break; } rc = (rc == WAIT_TIMEOUT) ? 0 : 1; #endif if( rc == 0 && m_nTimers ) { assert( m_nTimers && m_nTimerAlloc >= 2 && m_ppTimers[1] ); CTimer* pTimer = m_ppTimers[1]; if( pTimer->GetMode() == CTimer::Repeating ) { pTimer->m_next += pTimer->m_interval; } else { pTimer->m_mode = CTimer::Disabled; m_ppTimers[1] = m_ppTimers[m_nTimers--]; } Heapify( CTimer::CurrentTime(), 1 ); pTimer->GetResponse()->OnTimer(); } if( rc > 0 ) { UINT n; for( n = 0; n < m_nSocks; n++ ) { assert( WAITOBJ_IS_VALID( m_pWaitObjs[n] ) && NULL != m_ppSocks[n] ); int err = SOCKERR_NONE; waitevents_t wevt; CSocket* pSock = m_ppSocks[n]; #ifdef _UNIX wevt = m_pWaitObjs[n].revents; if( (wevt & (POLLIN|POLLERR)) && (pSock->m_uSelectFlags & SF_ACCEPT) == SF_ACCEPT ) { wevt = XPOLLACC; } if( (wevt & (POLLOUT|POLLERR)) && (pSock->m_uSelectFlags & SF_CONNECT) == SF_CONNECT ) { socklen_t errlen = sizeof(err); #if defined(_SOLARIS) && (_SOLARIS < 58) getsockopt( pSock->GetHandle(), SOL_SOCKET, SO_ERROR, (char*)&err, &errlen ); #else getsockopt( pSock->GetHandle(), SOL_SOCKET, SO_ERROR, &err, &errlen ); #endif wevt = XPOLLCNX; } if( (wevt & POLLERR) ) { wevt = pSock->m_uSelectFlags; } #endif #ifdef _WIN32 ::WSAEnumNetworkEvents( pSock->GetHandle(), m_pWaitObjs[n], &wevt ); if( wevt.lNetworkEvents & FD_CONNECT ) { err = wevt.iErrorCode[FD_CONNECT_BIT]; } if( wevt.lNetworkEvents & FD_CLOSE ) { wevt.lNetworkEvents = FD_READ; } #endif if( WAIT_EVENT_READ( wevt ) ) { pSock->GetResponse()->OnReadReady(); } else if( WAIT_EVENT_WRITE( wevt ) ) { pSock->GetResponse()->OnWriteReady(); } else if( WAIT_EVENT_ACCEPT( wevt ) ) { CListenSocket* pListen = (CListenSocket*)pSock; sockaddr_in sa; socklen_t salen = sizeof(sa); sockobj_t sock = accept( pListen->GetHandle(), (sockaddr*)&sa, &salen ); if( INVALID_SOCKET != sock ) { #ifdef _WIN32 // Cancel selections (they are inherited in Win32) ::WSAEventSelect( sock, 0, 0 ); #endif CTcpSocket* pNew = new CTcpSocket(); pNew->m_sock = sock; pListen->m_pAcceptResponse->OnConnection( pNew ); } } else if( WAIT_EVENT_CONNECT( wevt ) ) { pSock->GetResponse()->OnConnectDone( err ); } else if( WAIT_EVENT_EXCEPT( wevt ) ) { pSock->GetResponse()->OnExceptReady(); } } } } } Exit(); } bool CEventThread::Init( void ) { return false; } int CEventThread::Exit( void ) { return 0; } bool CEventThread::AddStream( CSocket* pSock ) { assert_or_retv( false, pSock ); if( m_nSocks >= WAITOBJ_MAX ) { return false; } if( m_nSocks == m_nSockAlloc ) { UINT nNewAlloc = m_nSocks*2; CSocket** ppSocks = new CSocket*[ nNewAlloc ]; memset( ppSocks, 0xDD, nNewAlloc * sizeof(CSocket*) ); memcpy( ppSocks, m_ppSocks, m_nSocks * sizeof(CSocket*) ); delete[] m_ppSocks; m_ppSocks = ppSocks; waitobj_t* pWaitObjs = new waitobj_t[ nNewAlloc ]; memset( pWaitObjs, 0xDD, nNewAlloc * sizeof(waitobj_t) ); memcpy( pWaitObjs, m_pWaitObjs, m_nSocks * sizeof(waitobj_t) ); delete[] m_pWaitObjs; m_pWaitObjs = pWaitObjs; m_nSockAlloc = nNewAlloc; } m_ppSocks[m_nSocks] = pSock; #ifdef _UNIX m_pWaitObjs[m_nSocks].fd = pSock->GetHandle(); m_pWaitObjs[m_nSocks].events = 0; m_pWaitObjs[m_nSocks].revents = 0; #endif #ifdef _WIN32 m_pWaitObjs[m_nSocks] = ::CreateEvent( NULL, FALSE, FALSE, NULL ); #endif m_nSocks++; return true; } void CEventThread::DelStream( CSocket* pSock ) { assert_or_ret( pSock ); UINT n; for( n = 0; n < m_nSocks; n++ ) { if( m_ppSocks[n] == pSock ) { #ifdef _WIN32 ::CloseHandle( m_pWaitObjs[n] ); #endif m_nSocks--; m_ppSocks[n] = m_ppSocks[m_nSocks]; m_pWaitObjs[n] = m_pWaitObjs[m_nSocks]; break; } } if( m_nSocks <= m_nSockAlloc/4 && m_nSockAlloc > MIN_STREAM_ALLOC ) { UINT nNewAlloc = m_nSockAlloc/2; CSocket** ppSocks = new CSocket*[ nNewAlloc ]; memset( ppSocks, 0xDD, nNewAlloc * sizeof(CSocket*) ); memcpy( ppSocks, m_ppSocks, m_nSocks * sizeof(CSocket*) ); delete[] m_ppSocks; m_ppSocks = ppSocks; waitobj_t* pWaitObjs = new waitobj_t[ nNewAlloc ]; memset( pWaitObjs, 0xDD, nNewAlloc * sizeof(waitobj_t) ); memcpy( pWaitObjs, m_pWaitObjs, m_nSocks * sizeof(waitobj_t) ); delete[] m_pWaitObjs; m_pWaitObjs = pWaitObjs; m_nSockAlloc = nNewAlloc; } } void CEventThread::SetStreamSelect( CSocket* pSock, UINT nWhich ) { assert_or_ret( pSock && pSock->IsOpen() ); UINT n; for( n = 0; n < m_nSocks; n++ ) { if( m_ppSocks[n] == pSock ) { #ifdef _UNIX m_pWaitObjs[n].events = nWhich; #endif #ifdef _WIN32 nWhich |= FD_CLOSE; ::WSAEventSelect( pSock->GetHandle(), m_pWaitObjs[n], nWhich ); #endif break; } } } // O( lg(n) ) void CEventThread::AddTimer( CTimer* pTimer ) { assert_or_ret( pTimer ); if( 1+m_nTimers == m_nTimerAlloc ) { UINT nNewAlloc = m_nTimerAlloc*2; CTimer** ppTimers = new CTimer*[ nNewAlloc ]; memset( ppTimers, 0xDD, nNewAlloc * sizeof(CTimer*) ); memcpy( ppTimers, m_ppTimers, (1+m_nTimers) * sizeof(CTimer*) ); delete[] m_ppTimers; m_nTimerAlloc = nNewAlloc; m_ppTimers = ppTimers; } m_nTimers++; UINT n = m_nTimers; UINT32 t = pTimer->GetTimeout(); while( n > 1 && t < m_ppTimers[ PARENT(n) ]->GetTimeout() ) { m_ppTimers[n] = m_ppTimers[ PARENT(n) ]; n = PARENT(n); } m_ppTimers[n] = pTimer; } // O( n ) void CEventThread::DelTimer( CTimer* pTimer ) { assert_or_ret( pTimer ); UINT n; for( n = 1; n <= m_nTimers; n++ ) { if( m_ppTimers[n] == pTimer ) { m_ppTimers[n] = m_ppTimers[m_nTimers]; m_nTimers--; Heapify( CTimer::CurrentTime(), n ); break; } } // Shrink heap if we are using less than 1/4 and more than minimum if( m_nTimers <= m_nTimerAlloc/4 && m_nTimerAlloc > MIN_TIMER_ALLOC ) { UINT nNewAlloc = m_nTimerAlloc/2; CTimer** ppTimers = new CTimer*[ nNewAlloc ]; memset( ppTimers, 0xDD, nNewAlloc * sizeof(CTimer*) ); memcpy( ppTimers, m_ppTimers, (1+m_nTimers) * sizeof(CTimer*) ); delete[] m_ppTimers; m_nTimerAlloc = nNewAlloc; m_ppTimers = ppTimers; } } void CEventThread::Heapify( UINT32 now, UINT n ) { UINT lnode = LEFT(n); UINT rnode = RIGHT(n); UINT low = n; UINT32 dn, dl, dr; dn = m_ppTimers[n]->GetTimeout() - now; if( dn > 0x7FFFFFFF ) dn = 0; if( lnode <= m_nTimers ) { dl = m_ppTimers[lnode]->GetTimeout() - now; if( dl > 0x7FFFFFFF ) dl = 0; if( dl < dn ) { low = lnode; dn = dl; } } if( rnode <= m_nTimers ) { dr = m_ppTimers[rnode]->GetTimeout() - now; if( dr > 0x7FFFFFFF ) dr = 0; if( dr < dn ) { low = rnode; dn = dr; } } if( low != n ) { CTimer* tmp; SWAP( tmp, m_ppTimers[n], m_ppTimers[low] ); Heapify( now, low ); } }