// Copyright (C) 2001,2002,2004,2005 Federico Montesino Pouzols <fedemp@altern.org>
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
//
// As a special exception, you may use this file as part of a free software
// library without restriction.  Specifically, if other files instantiate
// templates or use macros or inline functions from this file, or you compile
// this file and link it with other files to produce an executable, this
// file does not by itself cause the resulting executable to be covered by
// the GNU General Public License.  This exception does not however
// invalidate any other reasons why the executable file might be covered by
// the GNU General Public License.
//
// This exception applies only to the code released under the name GNU
// ccRTP.  If you copy code from other releases into a copy of GNU
// ccRTP, as the General Public License permits, the exception does
// not apply to the code that you add in this way.  To avoid misleading
// anyone as to the status of such modified files, you must delete
// this exception notice from them.
//
// If you write modifications of your own for GNU ccRTP, it is your choice
// whether to permit this exception to apply to your modifications.
// If you do not wish that, delete this exception notice.
//

#include "private.h"
#include <ccrtp/oqueue.h>

#ifdef  CCXX_NAMESPACES
namespace ost {
#endif

const size_t OutgoingDataQueueBase::defaultMaxSendSegmentSize = 65536;

OutgoingDataQueueBase::OutgoingDataQueueBase()
{
	// segment data in packets of no more than 65536 octets.
	setMaxSendSegmentSize(getDefaultMaxSendSegmentSize());
}

DestinationListHandler::DestinationListHandler() :
	destList(), destinationLock()
{
}

DestinationListHandler::~DestinationListHandler()
{
	TransportAddress* tmp = NULL;
	writeLockDestinationList();
	for (std::list<TransportAddress*>::iterator i = destList.begin();
	     destList.end() != i; i++) {
		tmp = *i;
#ifdef	CCXX_EXCEPTIONS
		try {
#endif
			delete tmp;
#ifdef	CCXX_EXCEPTIONS
		} catch (...) {}
#endif
	}
	unlockDestinationList();
}

bool
DestinationListHandler::addDestinationToList(const InetAddress& ia,
					     tpport_t data, tpport_t control)
{
	TransportAddress* addr = new TransportAddress(ia,data,control);
	writeLockDestinationList();
	destList.push_back(addr);
	unlockDestinationList();
	return true;
}

bool
DestinationListHandler::removeDestinationFromList(const InetAddress& ia,
						  tpport_t dataPort,
						  tpport_t controlPort)
{
	bool result = false;
	writeLockDestinationList();
	TransportAddress* tmp;
	for (std::list<TransportAddress*>::iterator i = destList.begin();
	     destList.end() != i && !result; ) {
		tmp = *i;
		if ( ia == tmp->getNetworkAddress() &&
		     dataPort == tmp->getDataTransportPort() &&
		     controlPort == tmp->getControlTransportPort() ) {
			// matches. -> remove it.
			result = true;
			destList.erase(i);
			delete tmp;
		} else{
			i++;
		}
	}
	unlockDestinationList();
	return result;
}

#ifdef	CCXX_IPV6

DestinationListHandlerIPV6::DestinationListHandlerIPV6() :
	destListIPV6(), destinationLock()
{
}

DestinationListHandlerIPV6::~DestinationListHandlerIPV6()
{
	TransportAddressIPV6* tmp = NULL;
	writeLockDestinationListIPV6();
	for (std::list<TransportAddressIPV6*>::iterator i = destListIPV6.begin();
	     destListIPV6.end() != i; i++) {
		tmp = *i;
#ifdef	CCXX_EXCEPTIONS
		try {
#endif
			delete tmp;
#ifdef	CCXX_EXCEPTIONS
		} catch (...) {}
#endif
	}
	unlockDestinationListIPV6();
}

bool
DestinationListHandlerIPV6::addDestinationToListIPV6(const IPV6Address& ia,
					     tpport_t data, tpport_t control)
{
	TransportAddressIPV6* addr = new TransportAddressIPV6(ia,data,control);
	writeLockDestinationListIPV6();
	destListIPV6.push_back(addr);
	unlockDestinationListIPV6();
	return true;
}

bool
DestinationListHandlerIPV6::removeDestinationFromListIPV6(const IPV6Address& ia,
						  tpport_t dataPort,
						  tpport_t controlPort)
{
	bool result = false;
	writeLockDestinationListIPV6();
	TransportAddressIPV6* tmp;
	for (std::list<TransportAddressIPV6*>::iterator i = destListIPV6.begin();
	     destListIPV6.end() != i && !result; ) {
		tmp = *i;
		if ( ia == tmp->getNetworkAddress() &&
		     dataPort == tmp->getDataTransportPort() &&
		     controlPort == tmp->getControlTransportPort() ) {
			// matches. -> remove it.
			result = true;
			destListIPV6.erase(i);
			delete tmp;
		} else {
			i++;
		}
	}
	unlockDestinationListIPV6();
	return result;
}


#endif

/// Schedule at 8 ms.
const microtimeout_t OutgoingDataQueue::defaultSchedulingTimeout = 8000;
/// Packets unsent will expire after 40 ms.
const microtimeout_t OutgoingDataQueue::defaultExpireTimeout = 40000;

OutgoingDataQueue::OutgoingDataQueue():
	OutgoingDataQueueBase(),
#ifdef	CCXX_IPV6
	DestinationListHandlerIPV6(),
#endif
	DestinationListHandler(),
	sendLock(),
        sendFirst(NULL), sendLast(NULL)
{
	setInitialTimestamp(random32());
	setSchedulingTimeout(getDefaultSchedulingTimeout());
	setExpireTimeout(getDefaultExpireTimeout());

	sendInfo.packetCount = 0;
	sendInfo.octetCount = 0;
	sendInfo.sendSeq = random16();    // random initial sequence number
	sendInfo.sendCC = 0;    // initially, 0 CSRC identifiers follow the fixed heade
	sendInfo.paddinglen = 0;          // do not add padding bits.
	sendInfo.marked = false;
	sendInfo.complete = true;
	// the local source is the first contributing source
	sendInfo.sendSources[0] = getLocalSSRC();
	// this will be an accumulator for the successive cycles of timestamp
	sendInfo.overflowTime.tv_sec = getInitialTime().tv_sec;
	sendInfo.overflowTime.tv_usec = getInitialTime().tv_usec;
}

void
OutgoingDataQueue::purgeOutgoingQueue()
{
	OutgoingRTPPktLink* sendnext;
	// flush the sending queue (delete outgoing packets
	// unsent so far)
	sendLock.writeLock();
	while ( sendFirst )
 	{
		sendnext = sendFirst->getNext();
		delete sendFirst;
		sendFirst = sendnext;
	}
	sendLast = NULL;
	sendLock.unlock();
}

bool
OutgoingDataQueue::addDestination(const InetHostAddress& ia,
				  tpport_t dataPort,
				  tpport_t controlPort)
{
	if ( 0 == controlPort )
		controlPort = dataPort + 1;
	bool result = addDestinationToList(ia,dataPort,controlPort);
	if ( result && isSingleDestination() ) {
		setDataPeer(ia,dataPort);
		setControlPeer(ia,controlPort);
	}
	return result;
}

bool
OutgoingDataQueue::addDestination(const InetMcastAddress& ia,
				  tpport_t dataPort,
				  tpport_t controlPort)
{
	if ( 0 == controlPort )
		controlPort = dataPort + 1;
	bool result = addDestinationToList(ia,dataPort,controlPort);
	if ( result && isSingleDestination() ) {
		setDataPeer(ia,dataPort);
		setControlPeer(ia,controlPort);
	}
	return result;
}

bool
OutgoingDataQueue::forgetDestination(const InetHostAddress& ia,
				     tpport_t dataPort,
				     tpport_t controlPort)
{
	if ( 0 == controlPort )
		controlPort = dataPort + 1;
	return DestinationListHandler::
		removeDestinationFromList(ia,dataPort,controlPort);
}

bool
OutgoingDataQueue::forgetDestination(const InetMcastAddress& ia,
				     tpport_t dataPort,
				     tpport_t controlPort)
{
	if ( 0 == controlPort )
		controlPort = dataPort + 1;
	return DestinationListHandler::
		removeDestinationFromList(ia,dataPort,controlPort);
}

#ifdef	CCXX_IPV6
bool
OutgoingDataQueue::addDestination(const IPV6Address& ia,
				  tpport_t dataPort,
				  tpport_t controlPort)
{
	if ( 0 == controlPort )
		controlPort = dataPort + 1;
	bool result = addDestinationToListIPV6(ia,dataPort,controlPort);
	if ( result && isSingleDestinationIPV6() ) {
		setDataPeerIPV6(ia,dataPort);
		setControlPeerIPV6(ia,controlPort);
	}
	return result;
}

bool
OutgoingDataQueue::forgetDestination(const IPV6Address& ia,
				     tpport_t dataPort,
				     tpport_t controlPort)
{
	if ( 0 == controlPort )
		controlPort = dataPort + 1;
	return DestinationListHandlerIPV6::
		removeDestinationFromListIPV6(ia,dataPort,controlPort);
}

#endif

bool
OutgoingDataQueue::isSending(void) const
{
	if(sendFirst)
		return true;

	return false;
}

microtimeout_t
OutgoingDataQueue::getSchedulingTimeout(void)
{
	struct timeval send, now;
	uint32 rate;
	uint32 rem;

	for(;;)
	{
		// if there is no packet to send, use the default scheduling
		// timeout
		if( !sendFirst )
			return schedulingTimeout;

		uint32 stamp = sendFirst->getPacket()->getTimestamp();
		stamp -= getInitialTimestamp();
		rate = getCurrentRTPClockRate();

		// now we want to get in <code>send</code> _when_ the
		// packet is scheduled to be sent.

		// translate timestamp to timeval
		send.tv_sec = stamp / rate;
		rem = stamp % rate;
		send.tv_usec = (1000ul*rem) / (rate/1000ul); // 10^6 * rem/rate

		// add timevals. Overflow holds the inital time
		// plus the time accumulated through successive
		// overflows of timestamp. See below.
		timeradd(&send,&(sendInfo.overflowTime),&send);
		gettimeofday(&now, NULL);

		// Problem: when timestamp overflows, time goes back.
		// We MUST ensure that _send_ is not too lower than
		// _now_, otherwise, we MUST keep how many time was
		// lost because of overflow. We assume that _send_
		// 5000 seconds lower than now suggests timestamp
		// overflow.  (Remember than the 32 bits of the
		// timestamp field are 47722 seconds under a sampling
		// clock of 90000 hz.)  This is not a perfect
		// solution. Disorderedly timestamped packets coming
		// after an overflowed one will be wrongly
		// corrected. Nevertheless, this may only corrupt a
		// handful of those packets every more than 13 hours
		// (if timestamp started from 0).
		if ( now.tv_sec - send.tv_sec > 5000){
			timeval overflow;
			overflow.tv_sec =(~static_cast<uint32>(0)) / rate;
			overflow.tv_usec = (~static_cast<uint32>(0)) % rate *
				1000000ul / rate;
			do {
				timeradd(&send,&overflow,&send);
				timeradd(&(sendInfo.overflowTime),&overflow,
					 &(sendInfo.overflowTime));
			} while ( now.tv_sec - send.tv_sec > 5000 );
		}

		// This tries to solve the aforementioned problem
		// about disordered packets coming after an overflowed
		// one. Now we apply the reverse idea.
		if ( send.tv_sec - now.tv_sec > 20000 ){
			timeval overflow;
			overflow.tv_sec = (~static_cast<uint32>(0)) / rate;
			overflow.tv_usec = (~static_cast<uint32>(0)) % rate *
				1000000ul / rate;
			timersub(&send,&overflow,&send);
		}

		// A: This sets a maximum timeout of 1 hour.
		if ( send.tv_sec - now.tv_sec > 3600 ){
			return 3600000000ul;
		}
		int32 diff =
			((send.tv_sec - now.tv_sec) * 1000000ul) +
			send.tv_usec - now.tv_usec;
		// B: wait <code>diff</code> usecs more before sending
		if ( diff >= 0 ){
			return static_cast<microtimeout_t>(diff);
		}

		// C: the packet must be sent right now
		if ( (diff < 0) &&
		     static_cast<microtimeout_t>(-diff) <= getExpireTimeout() ){
			return 0;
		}

		// D: the packet has expired -> delete it.
		sendLock.writeLock();
		OutgoingRTPPktLink* packet = sendFirst;
		sendFirst = sendFirst->getNext();
		onExpireSend(*(packet->getPacket()));  // new virtual to notify
		delete packet;
		if ( sendFirst )
			sendFirst->setPrev(NULL);
		else
			sendLast = NULL;
		sendLock.unlock();
	}
	I( false );
	return 0;
}

void
OutgoingDataQueue::putData(uint32 stamp, const unsigned char *data,
			   size_t datalen)
{
	if ( !data || !datalen )
		return;

	size_t step = 0, offset = 0;
	while ( offset < datalen ) {
		// remainder and step take care of segmentation
		// according to getMaxSendSegmentSize()
		size_t remainder = datalen - offset;
		step = ( remainder > getMaxSendSegmentSize() ) ?
			getMaxSendSegmentSize() : remainder;

                CryptoContext* pcc = getOutQueueCryptoContext(getLocalSSRC());

                OutgoingRTPPkt* packet;
		if ( sendInfo.sendCC )
                    packet = new OutgoingRTPPkt(sendInfo.sendSources,15,data + offset,step,
                                                sendInfo.paddinglen, pcc);
		else
                    packet = new OutgoingRTPPkt(data + offset,step,sendInfo.paddinglen, pcc);

		packet->setPayloadType(getCurrentPayloadType());
		packet->setSeqNum(sendInfo.sendSeq++);
		packet->setTimestamp(stamp + getInitialTimestamp());

		packet->setSSRCNetwork(getLocalSSRCNetwork());
		if ( (0 == offset) && getMark() ) {
			packet->setMarker(true);
			setMark(false);
		} else {
			packet->setMarker(false);
		}
                if (pcc != NULL) {
                    packet->protect(getLocalSSRC(), pcc);
                }
		// insert the packet into the "tail" of the sending queue
		sendLock.writeLock();
		OutgoingRTPPktLink *link =
			new OutgoingRTPPktLink(packet,sendLast,NULL);
		if (sendLast)
			sendLast->setNext(link);
		else
			sendFirst = link;
		sendLast = link;
		sendLock.unlock();

		offset += step;
	}
}

void
OutgoingDataQueue::sendImmediate(uint32 stamp, const unsigned char *data,
                           size_t datalen)
{
        if ( !data || !datalen )
                return;

        size_t step = 0, offset = 0;
        while ( offset < datalen ) {
                // remainder and step take care of segmentation
                // according to getMaxSendSegmentSize()
                size_t remainder = datalen - offset;
                step = ( remainder > getMaxSendSegmentSize() ) ?
                        getMaxSendSegmentSize() : remainder;

                CryptoContext* pcc = getOutQueueCryptoContext(getLocalSSRC());

                OutgoingRTPPkt* packet;
                if ( sendInfo.sendCC )
                    packet = new OutgoingRTPPkt(sendInfo.sendSources,15,data + offset,step,
                                                sendInfo.paddinglen, pcc);
                else
                    packet = new OutgoingRTPPkt(data + offset,step,sendInfo.paddinglen, pcc);


		packet->setPayloadType(getCurrentPayloadType());
                packet->setSeqNum(sendInfo.sendSeq++);
                packet->setTimestamp(stamp + getInitialTimestamp());
                packet->setSSRCNetwork(getLocalSSRCNetwork());
                if ( (0 == offset) && getMark() ) {
                        packet->setMarker(true);
                        setMark(false);
                } else {
                        packet->setMarker(false);
                }
                if (pcc != NULL) {
                    packet->protect(getLocalSSRC(), pcc);
                }
		dispatchImmediate(packet);
		delete packet;
		offset += step;
	}
}

void OutgoingDataQueue::dispatchImmediate(OutgoingRTPPkt *packet)
{
	lockDestinationList();
        if ( isSingleDestination() ) {
                TransportAddress* tmp = destList.front();
                // if going from multi destinations to single destinations.
                setDataPeer(tmp->getNetworkAddress(),
                            tmp->getDataTransportPort());

                sendData(packet->getRawPacket(),
                         packet->getRawPacketSizeSrtp());
        } else {
                // when no destination has been added, NULL == dest.
                for (std::list<TransportAddress*>::iterator i =
                             destList.begin(); destList.end() != i; i++) {
                        TransportAddress* dest = *i;
                        setDataPeer(dest->getNetworkAddress(),
                                    dest->getDataTransportPort());
                        sendData(packet->getRawPacket(),
                                 packet->getRawPacketSizeSrtp());
                }
        }
        unlockDestinationList();

#ifdef	CCXX_IPV6
	lockDestinationListIPV6();
        if ( isSingleDestinationIPV6() ) {
                TransportAddressIPV6* tmp6 = destListIPV6.front();
                // if going from multi destinations to single destinations.
                setDataPeerIPV6(tmp6->getNetworkAddress(),
                            tmp6->getDataTransportPort());

                sendDataIPV6(packet->getRawPacket(),
                         packet->getRawPacketSizeSrtp());
        } else {
                // when no destination has been added, NULL == dest.
                for (std::list<TransportAddressIPV6*>::iterator i6 =
                             destListIPV6.begin(); destListIPV6.end() != i6; i6++) {
                        TransportAddressIPV6* dest6 = *i6;
                        setDataPeerIPV6(dest6->getNetworkAddress(),
                                    dest6->getDataTransportPort());
                        sendDataIPV6(packet->getRawPacket(),
                                 packet->getRawPacketSizeSrtp());
                }
        }
        unlockDestinationListIPV6();
#endif
}

size_t
OutgoingDataQueue::dispatchDataPacket(void)
{
	sendLock.writeLock();
	OutgoingRTPPktLink* packetLink = sendFirst;

	if ( !packetLink ){
		sendLock.unlock();
		return 0;
	}

	OutgoingRTPPkt* packet = packetLink->getPacket();
	uint32 rtn = packet->getPayloadSize();
	dispatchImmediate(packet);

	// unlink the sent packet from the queue and destroy it. Also
	// record the sending.
	sendFirst = sendFirst->getNext();
	if ( sendFirst ) {
		sendFirst->setPrev(NULL);
	} else {
		sendLast = NULL;
	}
	// for general accounting and RTCP SR statistics
	sendInfo.packetCount++;
	sendInfo.octetCount += packet->getPayloadSize();
	delete packetLink;

	sendLock.unlock();
	return rtn;
}

size_t
OutgoingDataQueue::setPartial(uint32 stamp, unsigned char *data,
			      size_t offset, size_t max)
{
	sendLock.writeLock();
	OutgoingRTPPktLink* packetLink = sendFirst;
	while ( packetLink )
	{
		uint32 pstamp = packetLink->getPacket()->getTimestamp();
		if ( pstamp > stamp )
			packetLink = NULL;
		if ( pstamp >= stamp )
			break;

		packetLink = packetLink->getNext();
	}
	if ( !packetLink )
	{
		sendLock.unlock();
		return 0;
	}

	OutgoingRTPPkt* packet = packetLink->getPacket();
	if ( offset >= packet->getPayloadSize() )
		return 0;

	if ( max > packet->getPayloadSize() - offset )
		max = packet->getPayloadSize() - offset;

	memcpy((unsigned char*)(packet->getPayload()) + offset,
	       data, max);
	sendLock.unlock();
	return max;
}

void
OutgoingDataQueue::setOutQueueCryptoContext(CryptoContext* cc)
{
    std::list<CryptoContext *>::iterator i;

    MutexLock lock(cryptoMutex);
        // check if a CryptoContext for a SSRC already exists. If yes
        // remove it from list before inserting the new one.
    for( i = cryptoContexts.begin(); i!= cryptoContexts.end(); i++ ){
        if( (*i)->getSsrc() == cc->getSsrc() ) {
            CryptoContext* tmp = *i;
            cryptoContexts.erase(i);
            delete tmp;
            break;
        }
    }
    cryptoContexts.push_back(cc);
}

void
OutgoingDataQueue::removeOutQueueCryptoContext(CryptoContext* cc)
{
    std::list<CryptoContext *>::iterator i;

    MutexLock lock(cryptoMutex);
    if (cc == NULL) {     // Remove any incoming crypto contexts
        for (i = cryptoContexts.begin(); i != cryptoContexts.end(); ) {
            CryptoContext* tmp = *i;
            i = cryptoContexts.erase(i);
            delete tmp;
        }
    }
    else {
        for( i = cryptoContexts.begin(); i != cryptoContexts.end(); i++ ){
            if( (*i)->getSsrc() == cc->getSsrc() ) {
                CryptoContext* tmp = *i;
                cryptoContexts.erase(i);
                delete tmp;
                return;
            }
        }
    }
}

CryptoContext*
OutgoingDataQueue::getOutQueueCryptoContext(uint32 ssrc)
{
    std::list<CryptoContext *>::iterator i;

    MutexLock lock(cryptoMutex);
    for( i = cryptoContexts.begin(); i != cryptoContexts.end(); i++ ){
        if( (*i)->getSsrc() == ssrc) {
            return (*i);
        }
    }
    return NULL;
}

#ifdef  CCXX_NAMESPACES
}
#endif

/** EMACS **
 * Local variables:
 * mode: c++
 * c-basic-offset: 8
 * End:
 */


syntax highlighted by Code2HTML, v. 0.9.1