// Copyright (C) 2001,2002,2004 Federico Montesino Pouzols // // This program is free software; you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by // the Free Software Foundation; either version 2 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. // // As a special exception, you may use this file as part of a free software // library without restriction. Specifically, if other files instantiate // templates or use macros or inline functions from this file, or you compile // this file and link it with other files to produce an executable, this // file does not by itself cause the resulting executable to be covered by // the GNU General Public License. This exception does not however // invalidate any other reasons why the executable file might be covered by // the GNU General Public License. // // This exception applies only to the code released under the name GNU // ccRTP. If you copy code from other releases into a copy of GNU // ccRTP, as the General Public License permits, the exception does // not apply to the code that you add in this way. To avoid misleading // anyone as to the status of such modified files, you must delete // this exception notice from them. // // If you write modifications of your own for GNU ccRTP, it is your choice // whether to permit this exception to apply to your modifications. // If you do not wish that, delete this exception notice. // #include "private.h" #include #ifdef CCXX_NAMESPACES namespace ost { #endif const size_t IncomingDataQueueBase::defaultMaxRecvPacketSize = 65534; ConflictHandler::ConflictingTransportAddress:: ConflictingTransportAddress(InetAddress na,tpport_t dtp, tpport_t ctp): networkAddress(na), dataTransportPort(dtp), controlTransportPort(ctp), next(NULL) { gettimeofday(&lastPacketTime,NULL); } ConflictHandler::ConflictingTransportAddress* ConflictHandler::searchDataConflict(InetAddress na, tpport_t dtp) { ConflictingTransportAddress* result = firstConflict; while ( result->networkAddress != na || result->dataTransportPort != dtp) result = result->next; return result; } ConflictHandler::ConflictingTransportAddress* ConflictHandler::searchControlConflict(InetAddress na, tpport_t ctp) { ConflictingTransportAddress* result = firstConflict; while ( result && (result->networkAddress != na || result->controlTransportPort != ctp) ) result = result->next; return result; } void ConflictHandler::addConflict(const InetAddress& na, tpport_t dtp, tpport_t ctp) { ConflictingTransportAddress* nc = new ConflictingTransportAddress(na,dtp,ctp); if ( lastConflict ) { lastConflict->setNext(nc); lastConflict = nc; } else { firstConflict = lastConflict = nc; } } const uint8 IncomingDataQueue::defaultMinValidPacketSequence = 0; const uint16 IncomingDataQueue::defaultMaxPacketMisorder = 0; const uint16 IncomingDataQueue::defaultMaxPacketDropout = 3000; const size_t IncomingDataQueue::defaultMembersSize = MembershipBookkeeping::defaultMembersHashSize; IncomingDataQueue::IncomingDataQueue(uint32 size) : IncomingDataQueueBase(), MembershipBookkeeping(size) { recvFirst = recvLast = NULL; sourceExpirationPeriod = 5; // 5 RTCP report intervals minValidPacketSequence = getDefaultMinValidPacketSequence(); maxPacketDropout = getDefaultMaxPacketDropout(); maxPacketMisorder = getDefaultMaxPacketMisorder(); } void IncomingDataQueue::purgeIncomingQueue() { IncomingRTPPktLink* recvnext; // flush the reception queue (incoming packets not yet // retrieved) recvLock.writeLock(); while( recvFirst ) { recvnext = recvFirst->getNext(); // nullify source specific packet list SyncSourceLink *s = recvFirst->getSourceLink(); s->setFirst(NULL); s->setLast(NULL); delete recvFirst->getPacket(); delete recvFirst; recvFirst = recvnext; } recvLock.unlock(); } void IncomingDataQueue::renewLocalSSRC() { const uint32 MAXTRIES = 20; uint32 newssrc; uint16 tries = 0; do { newssrc = random32(); tries++; } while ( (tries < MAXTRIES) && isRegistered(newssrc) ); if ( MAXTRIES == tries ) { // TODO we are in real trouble. } } bool IncomingDataQueue::isWaiting(const SyncSource* src) const { bool w; recvLock.readLock(); if ( NULL == src ) w = ( NULL != recvFirst); else w = isMine(*src) && ( NULL != getLink(*src)->getFirst() ); recvLock.unlock(); return w; } uint32 IncomingDataQueue::getFirstTimestamp(const SyncSource* src) const { recvLock.readLock(); // get the first packet IncomingRTPPktLink* packetLink; if ( NULL == src ) packetLink = recvFirst; else packetLink = isMine(*src) ? getLink(*src)->getFirst() : NULL; // get the timestamp of the first packet uint32 ts; if ( packetLink ) ts = packetLink->getTimestamp(); else ts = 0l; recvLock.unlock(); return ts; } size_t IncomingDataQueue::takeInDataPacket(void) { InetHostAddress network_address; tpport_t transport_port; uint32 nextSize = (uint32)getNextDataPacketSize(); unsigned char* buffer = new unsigned char[nextSize]; int32 rtn = (int32)recvData(buffer,nextSize,network_address,transport_port); if ( (rtn < 0) || ((uint32)rtn > getMaxRecvPacketSize()) ){ delete buffer; return 0; } // get time of arrival struct timeval recvtime; gettimeofday(&recvtime,NULL); // build a packet. It will link itself to its source IncomingRTPPkt* packet = new IncomingRTPPkt(buffer,rtn); // Generic header validity check. if ( !packet->isHeaderValid() ) { delete packet; return 0; } CryptoContext* pcc = getInQueueCryptoContext( packet->getSSRC()); if (pcc != NULL) { int32 ret = packet->unprotect(pcc); if (ret < 0) { if (!onSRTPPacketError(*packet, ret)) { delete packet; return 0; } } } // virtual for profile-specific validation and processing. if ( !onRTPPacketRecv(*packet) ) { delete packet; return 0; } bool source_created; SyncSourceLink* sourceLink = getSourceBySSRC(packet->getSSRC(),source_created); SyncSource* s = sourceLink->getSource(); if ( source_created ) { // Set data transport address. setDataTransportPort(*s,transport_port); // Network address is assumed to be the same as the control one setNetworkAddress(*s,network_address); sourceLink->initStats(); // First packet arrival time. sourceLink->setInitialDataTime(recvtime); sourceLink->setProbation(getMinValidPacketSequence()); if ( sourceLink->getHello() ) onNewSyncSource(*s); } else if ( 0 == s->getDataTransportPort() ) { // Test if RTCP packets had been received but this is the // first data packet from this source. setDataTransportPort(*s,transport_port); } // Before inserting in the queue, // 1) check for collisions and loops. If the packet cannot be // assigned to a source, it will be rejected. // 2) check the source is a sufficiently well known source // TODO: also check CSRC identifiers. if ( checkSSRCInIncomingRTPPkt(*sourceLink,source_created, network_address,transport_port) && recordReception(*sourceLink,*packet,recvtime) ) { // now the packet link is linked in the queues IncomingRTPPktLink* packetLink = new IncomingRTPPktLink(packet, sourceLink, recvtime, packet->getTimestamp() - sourceLink->getInitialDataTimestamp(), NULL,NULL,NULL,NULL); insertRecvPacket(packetLink); } else { // must be discarded due to collision or loop or // invalid source delete packet; } // ccRTP keeps packets from the new source, but avoids // flip-flopping. This allows losing less packets and for // mobile telephony applications or other apps that may change // the source transport address during the session. return rtn; } bool IncomingDataQueue::checkSSRCInIncomingRTPPkt(SyncSourceLink& sourceLink, bool is_new, InetAddress& network_address, tpport_t transport_port) { bool result = true; // Test if the source is new and it is not the local one. if ( is_new && sourceLink.getSource()->getID() != getLocalSSRC() ) return result; SyncSource *s = sourceLink.getSource(); if ( s->getDataTransportPort() != transport_port || s->getNetworkAddress() != network_address ) { // SSRC collision or a loop has happened if ( s->getID() != getLocalSSRC() ) { // TODO: Optional error counter. // Note this differs from the default in the RFC. // Discard packet only when the collision is // repeating (to avoid flip-flopping) if ( sourceLink.getPrevConflict() && ( (network_address == sourceLink.getPrevConflict()->networkAddress) && (transport_port == sourceLink.getPrevConflict()->dataTransportPort) ) ) { // discard packet and do not flip-flop result = false; } else { // Record who has collided so that in // the future we can how if the // collision repeats. sourceLink.setPrevConflict(network_address, transport_port,0); // Change sync source transport address setDataTransportPort(*s,transport_port); setNetworkAddress(*s,network_address); } } else { // Collision or loop of own packets. ConflictingTransportAddress* conflicting = searchDataConflict(network_address, transport_port); if ( conflicting ) { // Optional error counter. updateConflict(*conflicting); result = false; } else { // New collision addConflict(s->getNetworkAddress(), s->getDataTransportPort(), s->getControlTransportPort()); dispatchBYE("SSRC collision detected when receiving data packet."); renewLocalSSRC(); setNetworkAddress(*s,network_address); setDataTransportPort(*s,transport_port); setControlTransportPort(*s,0); sourceLink.initStats(); sourceLink.setProbation(getMinValidPacketSequence()); } } } return result; } bool IncomingDataQueue::insertRecvPacket(IncomingRTPPktLink* packetLink) { SyncSourceLink *srcLink = packetLink->getSourceLink(); unsigned short seq = packetLink->getPacket()->getSeqNum(); recvLock.writeLock(); IncomingRTPPktLink* plink = srcLink->getLast(); if ( plink && (seq < plink->getPacket()->getSeqNum()) ) { // a disordered packet, so look for its place while ( plink && (seq < plink->getPacket()->getSeqNum()) ){ // the packet is a duplicate if ( seq == plink->getPacket()->getSeqNum() ) { recvLock.unlock(); VDL(("Duplicated disordered packet: seqnum %d, SSRC:", seq,srcLink->getSource()->getID())); delete packetLink->getPacket(); delete packetLink; return false; } plink = plink->getSrcPrev(); } if ( !plink ) { // we have scanned the whole (and non empty) // list, so this must be the older (first) // packet from this source. // insert into the source specific queue IncomingRTPPktLink* srcFirst = srcLink->getFirst(); srcFirst->setSrcPrev(packetLink); packetLink->setSrcNext(srcFirst); // insert into the global queue IncomingRTPPktLink* prevFirst = srcFirst->getPrev(); if ( prevFirst ){ prevFirst->setNext(packetLink); packetLink->setPrev(prevFirst); } srcFirst->setPrev(packetLink); packetLink->setNext(srcFirst); srcLink->setFirst(packetLink); } else { // (we are in the middle of the source list) // insert into the source specific queue plink->getSrcNext()->setSrcPrev(packetLink); packetLink->setSrcNext(plink->getSrcNext()); // -- insert into the global queue, with the // minimum priority compared to packets from // other sources plink->getSrcNext()->getPrev()->setNext(packetLink); packetLink->setPrev(plink->getSrcNext()->getPrev()); plink->getSrcNext()->setPrev(packetLink); packetLink->setNext(plink->getSrcNext()); // ------ plink->setSrcNext(packetLink); packetLink->setSrcPrev(plink); // insert into the global queue (giving // priority compared to packets from other sources) //list->getNext->setPrev(packetLink); //packetLink->setNext(list->getNext); //list->setNext(packet); //packet->setPrev(list); } } else { // An ordered packet if ( !plink ) { // the only packet in the source specific queue srcLink->setLast(packetLink); srcLink->setFirst(packetLink); // the last packet in the global queue if ( recvLast ) { recvLast->setNext(packetLink); packetLink->setPrev(recvLast); } recvLast = packetLink; if ( !recvFirst ) recvFirst = packetLink; } else { // there are already more packets from this source. // this ignores duplicate packets if ( plink && (seq == plink->getPacket()->getSeqNum()) ) { VDL(("Duplicated packet: seqnum %d, SSRC:", seq,srcLink->getSource->getID())); recvLock.unlock(); delete packetLink->getPacket(); delete packetLink; return false; } // the last packet in the source specific queue srcLink->getLast()->setSrcNext(packetLink); packetLink->setSrcPrev(srcLink->getLast()); srcLink->setLast(packetLink); // the last packet in the global queue recvLast->setNext(packetLink); packetLink->setPrev(recvLast); recvLast = packetLink; } } // account the insertion of this packet into the queue srcLink->recordInsertion(*packetLink); recvLock.unlock(); // packet successfully inserted return true; } const AppDataUnit* IncomingDataQueue::getData(uint32 stamp, const SyncSource* src) { IncomingRTPPktLink* pl; // unsigned count = 0; AppDataUnit* result; if ( NULL != (pl = getWaiting(stamp,src)) ) { IncomingRTPPkt* packet = pl->getPacket(); // size_t len = packet->getPayloadSize(); SyncSource &src = *(pl->getSourceLink()->getSource()); result = new AppDataUnit(*packet,src); // delete the packet link, but not the packet delete pl; // count += len; } else { result = NULL; } return result; } // FIX: try to merge and organize IncomingDataQueue::IncomingRTPPktLink* IncomingDataQueue::getWaiting(uint32 timestamp, const SyncSource* src) { if ( src && !isMine(*src) ) return NULL; IncomingRTPPktLink *result; recvLock.writeLock(); if ( src != NULL ) { // process source specific queries: // we will modify the queue of this source SyncSourceLink* srcm = getLink(*src); // first, delete all older packets. The while loop // down here counts how many older packets are there; // then the for loop deletes them and advances l till // the first non older packet. int nold = 0; IncomingRTPPktLink* l = srcm->getFirst(); if ( !l ) { result = NULL; recvLock.unlock(); return result; } while ( l && (l->getTimestamp() < timestamp) || end2EndDelayed(*l) ) { nold++; l = l->getSrcNext(); } // to know whether the global queue gets empty bool nonempty = false; for ( int i = 0; i < nold; i++) { l = srcm->getFirst(); srcm->setFirst(srcm->getFirst()->getSrcNext());; // unlink from the global queue nonempty = false; if ( l->getPrev() ){ nonempty = true; l->getPrev()->setNext(l->getNext()); } if ( l->getNext() ) { nonempty = true; l->getNext()->setPrev(l->getPrev()); } // now, delete it onExpireRecv(*(l->getPacket()));// notify packet discard delete l->getPacket(); delete l; } // return the packet, if found if ( !srcm->getFirst() ) { // threre are no more packets from this source srcm->setLast(NULL); if ( !nonempty ) recvFirst = recvLast = NULL; result = NULL; } else if ( srcm->getFirst()->getTimestamp() > timestamp ) { // threre are only newer packets from this source srcm->getFirst()->setSrcPrev(NULL); result = NULL; } else { // (src->getFirst()->getTimestamp() == stamp) is true result = srcm->getFirst(); // unlink the selected packet from the global queue if ( result->getPrev() ) result->getPrev()->setNext(result->getNext()); else recvFirst = result->getNext(); if ( result->getNext() ) result->getNext()->setPrev(result->getPrev()); else recvLast = result->getPrev(); // unlink the selected packet from the source queue srcm->setFirst(result->getSrcNext()); if ( srcm->getFirst() ) srcm->getFirst()->setPrev(NULL); else srcm->setLast(NULL); } } else { // process source unspecific queries int nold = 0; IncomingRTPPktLink* l = recvFirst; while ( l && (l->getTimestamp() < timestamp || end2EndDelayed(*l) ) ){ nold++; l = l->getNext(); } for (int i = 0; i < nold; i++) { IncomingRTPPktLink* l = recvFirst; recvFirst = recvFirst->getNext(); // unlink the packet from the queue of its source SyncSourceLink* src = l->getSourceLink(); src->setFirst(l->getSrcNext()); if ( l->getSrcNext() ) l->getSrcNext()->setSrcPrev(NULL); else src->setLast(NULL); // now, delete it onExpireRecv(*(l->getPacket()));// notify packet discard delete l->getPacket(); delete l; } // return the packet, if found if ( !recvFirst ) { // there are no more packets in the queue recvLast = NULL; result = NULL; } else if ( recvFirst->getTimestamp() > timestamp ) { // there are only newer packets in the queue l->setPrev(NULL); result = NULL; } else { // (recvFirst->getTimestamp() == stamp) is true result = recvFirst; // unlink the selected packet from the global queue recvFirst = recvFirst->getNext(); if ( recvFirst ) recvFirst->setPrev(NULL); else recvLast = NULL; // unlink the selected packet from the queue // of its source SyncSourceLink* src = result->getSourceLink(); src->setFirst(result->getSrcNext()); if ( src->getFirst() ) src->getFirst()->setSrcPrev(NULL); else src->setLast(NULL); } } recvLock.unlock(); return result; } bool IncomingDataQueue::recordReception(SyncSourceLink& srcLink, const IncomingRTPPkt& pkt, const timeval recvtime) { bool result = true; // Source validation. SyncSource* src = srcLink.getSource(); if ( !(srcLink.isValid()) ) { // source is not yet valid. if ( pkt.getSeqNum() == srcLink.getMaxSeqNum() + 1 ) { // packet in sequence. srcLink.decProbation(); if ( srcLink.isValid() ) { // source has become valid. // TODO: avoid this the first time. srcLink.initSequence(pkt.getSeqNum()); } else { result = false; } } else { // packet not in sequence. srcLink.probation = getMinValidPacketSequence() - 1; result = false; } srcLink.setMaxSeqNum(pkt.getSeqNum()); } else { // source was already valid. uint16 step = pkt.getSeqNum() - srcLink.getMaxSeqNum(); if ( step < getMaxPacketDropout() ) { // Ordered, with not too high step. if ( pkt.getSeqNum() < srcLink.getMaxSeqNum() ) { // sequene number wrapped. srcLink.incSeqNumAccum(); } srcLink.setMaxSeqNum(pkt.getSeqNum()); } else if ( step <= (SEQNUMMOD - getMaxPacketMisorder()) ) { // too high step of the sequence number. if ( pkt.getSeqNum() == srcLink.getBadSeqNum() ) { srcLink.initSequence(pkt.getSeqNum()); } else { srcLink.setBadSeqNum((pkt.getSeqNum() + 1) & (SEQNUMMOD - 1) ); //This additional check avoids that //the very first packet from a source //be discarded. if ( 0 < srcLink.getObservedPacketCount() ) { result = false; } else { srcLink.setMaxSeqNum(pkt.getSeqNum()); } } } else { // duplicate or reordered packet } } if ( result ) { // the packet is considered valid. srcLink.incObservedPacketCount(); srcLink.incObservedOctetCount(pkt.getPayloadSize()); srcLink.lastPacketTime = recvtime; if ( srcLink.getObservedPacketCount() == 1 ) { // ooops, it's the first packet from this source setSender(*src,true); srcLink.setInitialDataTimestamp(pkt.getTimestamp()); } // we record the last time a packet from this source // was received, this has statistical interest and is // needed to time out old senders that are no sending // any longer. // compute the interarrival jitter estimation. timeval tarrival; timeval lastT = srcLink.getLastPacketTime(); timeval initial = srcLink.getInitialDataTime(); timersub(&lastT,&initial,&tarrival); uint32 arrival = timeval2microtimeout(tarrival) * getCurrentRTPClockRate(); uint32 transitTime = arrival - pkt.getTimestamp(); int32 delta = transitTime - srcLink.getLastPacketTransitTime(); srcLink.setLastPacketTransitTime(transitTime); if ( delta < 0 ) delta = -delta; srcLink.setJitter( srcLink.getJitter() + (1.0f / 16.0f) * (static_cast(delta) - srcLink.getJitter())); } return result; } void IncomingDataQueue::recordExtraction(const IncomingRTPPkt&) { } void IncomingDataQueue::setInQueueCryptoContext(CryptoContext* cc) { std::list::iterator i; MutexLock lock(cryptoMutex); // check if a CryptoContext for a SSRC already exists. If yes // remove it from list before inserting the new one. for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){ if( (*i)->getSsrc() == cc->getSsrc() ) { CryptoContext* tmp = *i; cryptoContexts.erase(i); delete tmp; break; } } cryptoContexts.push_back(cc); } void IncomingDataQueue::removeInQueueCryptoContext(CryptoContext* cc) { std::list::iterator i; MutexLock lock(cryptoMutex); if (cc == NULL) { // Remove any incoming crypto contexts for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) { CryptoContext* tmp = *i; i = cryptoContexts.erase(i); delete tmp; } } else { for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){ if( (*i)->getSsrc() == cc->getSsrc() ) { CryptoContext* tmp = *i; cryptoContexts.erase(i); delete tmp; return; } } } } CryptoContext* IncomingDataQueue::getInQueueCryptoContext(uint32 ssrc) { std::list::iterator i; MutexLock lock(cryptoMutex); for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){ if( (*i)->getSsrc() == ssrc) { return (*i); } } return NULL; } #ifdef CCXX_NAMESPACES } #endif /** EMACS ** * Local variables: * mode: c++ * c-basic-offset: 8 * End: */