/*============================================================================= CABufferQueue.cpp $Log: CABufferQueue.cpp,v $ Revision 1.7 2004/12/15 23:20:49 jcm10 fix compile error Revision 1.6 2004/12/15 23:19:18 dwyatt clear buffer in-progress flag Revision 1.5 2004/09/30 21:07:14 jcm10 add missing include Revision 1.4 2004/05/26 18:34:08 dwyatt oops, pop_front, not pop_back Revision 1.3 2004/05/26 00:34:43 dwyatt get rid of the lock on the work thread's queue Revision 1.2 2004/03/02 22:13:54 dwyatt run at fixed priority Revision 1.1 2004/01/14 00:08:09 dwyatt moved from Source/Tests/AudioFileUtility/Utility Revision 1.2 2003/12/15 23:01:12 dwyatt [3509039] fix crash in CAPushBufferQueue::Flush Revision 1.1 2003/10/15 00:27:11 dwyatt initial checkin created Fri Oct 10 2003, Doug Wyatt Copyright (c) 2003 Apple Computer, Inc. All Rights Reserved $NoKeywords: $ =============================================================================*/ #include "CABufferQueue.h" #if TARGET_OS_WIN32 #include "CAWindows.h" #endif // ____________________________________________________________________________ CABufferQueue::WorkThread *CABufferQueue::sWorkThread = NULL; CABufferQueue::WorkThread::WorkThread() : CAPThread(ThreadEntry, this, CAPThread::kMaxThreadPriority, true), mStopped(false), mRunGuard("mRunGuard") { // prime the container to have some elements so we're not calling malloc dynamically Buffer *b = NULL; for (int i = 0; i < 64; ++i) mWorkQueue.push_back(b); mWorkQueue.clear(); Start(); } void CABufferQueue::WorkThread::Run() { while (!mStopped) { CAGuard::Locker lock(mRunGuard); mRunGuard.Wait(); while (!mStopped) { Buffer *b; // add buffers from the other thread while ((b = mBuffersToAdd.pop_atomic()) != NULL) mWorkQueue.push_back(b); if (mWorkQueue.empty()) break; b = mWorkQueue.front(); mWorkQueue.pop_front(); b->Queue()->ProcessBuffer(b); b->SetInProgress(false); } } } void CABufferQueue::WorkThread::Stop() { mStopped = true; mRunGuard.Notify(); } void CABufferQueue::WorkThread::AddBuffer(Buffer *b) { b->SetInProgress(true); mBuffersToAdd.push_atomic(b); mRunGuard.Notify(); } void CABufferQueue::WorkThread::RemoveBuffers(CABufferQueue *owner) { CAGuard::Locker lock(mRunGuard); for (WorkQueue::iterator it = mWorkQueue.begin(); it != mWorkQueue.end(); ) { if ((*it)->Queue() == owner) { WorkQueue::iterator next = it; ++next; mWorkQueue.erase(it); it = next; } else ++it; } } // ____________________________________________________________________________ CABufferQueue::Buffer::Buffer(CABufferQueue *queue, const CAStreamBasicDescription &fmt, UInt32 nBytes) : mQueue(queue) { mMemory = CABufferList::New("", fmt); mMemory->AllocateBuffers(nBytes); mByteSize = nBytes; mInProgress = false; mStartFrame = mEndFrame = 0; mEndOfStream = false; } // return true if buffer emptied AND we're not at end-of-stream bool CABufferQueue::Buffer::CopyInto(AudioBufferList *destBufferList, int bytesPerFrame, UInt32 &framesProduced, UInt32 &framesRequired) { UInt32 framesInBuffer = mEndFrame - mStartFrame; UInt32 framesToCopy = std::min(framesInBuffer, framesRequired); if (framesToCopy > 0) { const CABufferList *bufMemory = mMemory; const AudioBufferList &srcBufferList = bufMemory->GetBufferList(); const AudioBuffer *srcbuf = srcBufferList.mBuffers; AudioBuffer *dstbuf = destBufferList->mBuffers; for (int i = destBufferList->mNumberBuffers; --i >= 0; ++srcbuf, ++dstbuf) { memcpy( (Byte *)dstbuf->mData + framesProduced * bytesPerFrame, (Byte *)srcbuf->mData + mStartFrame * bytesPerFrame, framesToCopy * bytesPerFrame); } framesProduced += framesToCopy; framesRequired -= framesToCopy; mStartFrame += framesToCopy; } return (framesToCopy == framesInBuffer) && !mEndOfStream; } // return true if buffer filled bool CABufferQueue::Buffer::CopyFrom(const AudioBufferList *srcBufferList, int bytesPerFrame, UInt32 &framesProduced, UInt32 &framesRequired) { UInt32 framesInBuffer = mEndFrame - mStartFrame; UInt32 freeFramesInBuffer = (mByteSize / bytesPerFrame) - framesInBuffer; UInt32 framesToCopy = std::min(freeFramesInBuffer, framesRequired); if (framesToCopy > 0) { const AudioBuffer *srcbuf = srcBufferList->mBuffers; const CABufferList *bufMemory = mMemory; const AudioBufferList &destBufferList = bufMemory->GetBufferList(); const AudioBuffer *dstbuf = destBufferList.mBuffers; for (int i = srcBufferList->mNumberBuffers; --i >= 0; ++srcbuf, ++dstbuf) { memcpy( (Byte *)dstbuf->mData + framesInBuffer * bytesPerFrame, (Byte *)srcbuf->mData + framesProduced * bytesPerFrame, framesToCopy * bytesPerFrame); } framesProduced += framesToCopy; framesRequired -= framesToCopy; mEndFrame += framesToCopy; } return (framesToCopy == freeFramesInBuffer); } // ____________________________________________________________________________ CABufferQueue::CABufferQueue(int nBuffers, UInt32 bufferSizeFrames) : mNumberBuffers(nBuffers), mBuffers(NULL), mBufferSizeFrames(bufferSizeFrames), mBufferList(NULL) { mCurrentBuffer = 0; mErrorCount = 0; if (sWorkThread == NULL) sWorkThread = new WorkThread(); mWorkThread = sWorkThread; // for now } CABufferQueue::~CABufferQueue() { CancelAndDisposeBuffers(); } void CABufferQueue::CancelBuffers() { mWorkThread->RemoveBuffers(this); } void CABufferQueue::CancelAndDisposeBuffers() { CancelBuffers(); if (mBuffers) { for (int i = 0; i < mNumberBuffers; ++i) delete mBuffers[i]; delete[] mBuffers; mBuffers = NULL; } delete mBufferList; mBufferList = NULL; } void CABufferQueue::SetFormat(const CAStreamBasicDescription &fmt) { CancelAndDisposeBuffers(); mBytesPerFrame = fmt.mBytesPerFrame; mBuffers = new Buffer*[mNumberBuffers]; for (int i = 0; i < mNumberBuffers; ++i) mBuffers[i] = CreateBuffer(fmt, mBufferSizeFrames * mBytesPerFrame); mBufferList = CABufferList::New("", fmt); } // ____________________________________________________________________________ void CAPushBufferQueue::PushBuffer(UInt32 inNumberFrames, const AudioBufferList *inBufferList) { UInt32 framesRequired = inNumberFrames; UInt32 framesProduced = 0; do { Buffer *b = mBuffers[mCurrentBuffer]; if (b->InProgress()) { ++mErrorCount; break; } if (b->CopyFrom(inBufferList, mBytesPerFrame, framesProduced, framesRequired)) { // buffer was filled, we're done with it sWorkThread->AddBuffer(b); if (++mCurrentBuffer == mNumberBuffers) mCurrentBuffer = 0; } } while (framesRequired > 0); } void CAPushBufferQueue::Flush() { if (mBuffers != NULL) { Buffer *b = mBuffers[mCurrentBuffer]; if (b->FrameCount() > 0 && !b->InProgress()) ProcessBuffer(b); } } // ____________________________________________________________________________ void CAPullBufferQueue::PullBuffer(UInt32 &ioFrames, AudioBufferList *outBufferList) { if (mEndOfStream) { ioFrames = 0; return; } UInt32 framesRequired = ioFrames; UInt32 framesProduced = 0; do { Buffer *b = mBuffers[mCurrentBuffer]; if (b->InProgress()) { ++mErrorCount; break; } if (b->CopyInto(outBufferList, mBytesPerFrame, framesProduced, framesRequired)) { // buffer emptied sWorkThread->AddBuffer(b); if (++mCurrentBuffer == mNumberBuffers) mCurrentBuffer = 0; } else if (b->ReachedEndOfStream()) { mEndOfStream = true; break; } } while (framesRequired > 0); ioFrames = framesProduced; } void CAPullBufferQueue::Prime() { mEndOfStream = false; for (int i = 0; i < mNumberBuffers; ++i) { Buffer *b = mBuffers[i]; ProcessBuffer(b); b->SetInProgress(false); } mCurrentBuffer = 0; }