///////////////////////////////////////////////////////////////////////////////

// MQ4CPP - Message queuing for C++

// Copyright (C) 2004-2007  Riccardo Pompeo (Italy)

//

// This library is free software; you can redistribute it and/or

// modify it under the terms of the GNU Lesser General Public

// License as published by the Free Software Foundation; either

// version 2.1 of the License, or (at your option) any later version.

//

// This library is distributed in the hope that it will be useful,

// but WITHOUT ANY WARRANTY; without even the implied warranty of

// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU

// Lesser General Public License for more details.

//

// You should have received a copy of the GNU Lesser General Public

// License along with this library; if not, write to the Free Software

// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

//

// WARNING: PacketCompression works using a cache mechanism to avoid to send 

// dictionary information and reducing the bandwidth needed. This mechanism works 

// only in a peer-to-peer transmition. If you plan to use multicast packets or 

// to have a single server with multiple client, the cache mechanism doesn't work

// correctly because PacketCompression isn't able to determine the packet source.  

// In this case you should disable caching calling PacketCompression(false)

// WARNING: compression is a cpu-consuming process. Use only if you have a low 

// bandwidth connectivity with your peer.

//


#define SILENT

#include "Trace.h"

#include "Logger.h"

#include "Compression.h"

#include "MergeSort.h"

#include "Timer.h"

#include <cmath>

#define SYMBOLS 256


PacketCompression::PacketCompression(bool theUseCacheMechanism)
{
	TRACE("PacketCompression::PacketCompression - start")
	itsUseCacheMechanism=theUseCacheMechanism;
	stream_length=0;
	shift_count=0;
	accumulator=0;
	bit_count=0;
	percent=0;
	itsSendPeerReset=true;
	reset();
	TRACE("PacketCompression::PacketCompression - end")	
}

PacketCompression::~PacketCompression()
{
	TRACE("PacketCompression::~PacketCompression - start")	

	TRACE("PacketCompression::~PacketCompression - end")	
}

void PacketCompression::reset()
{
	TRACE("PacketCompression::reset - start")	

	itsDeflateCacheIndex=0;

	for(unsigned cnt=0; cnt < 8; cnt++)
	{
		itsDeflateCacheSchema[cnt]=0;
		itsInflateCacheSchema[cnt]=0;
		itsDeflateCacheCheckBit[cnt]=0;
		itsInflateCacheCheckBit[cnt]=0;
	}	

	for(unsigned cnt=0; cnt < 8; cnt++)
		for(unsigned cnt1=0; cnt1 < 128; cnt1++)
		{
			itsDeflateCacheDictionary[cnt][cnt1]=0;
			itsInflateCacheDictionary[cnt][cnt1]=0;
		}	

	TRACE("PacketCompression::reset - end")	
}

void PacketCompression::evaluateDictionary(vector< pair<int,unsigned char> >& occurrence,unsigned* evaluator)
{
	TRACE("PacketCompression::evaluateDictionary - start")	

	evaluator[0]=(1+0)*8;
	evaluator[1]=(1+2)*8;
	evaluator[2]=(1+4)*8;
	evaluator[3]=(1+8)*8;
	evaluator[4]=(1+16)*8;
	evaluator[5]=(1+32)*8;
	evaluator[6]=(1+64)*8;
	evaluator[7]=(1+128)*8;

	for(unsigned cnt=0;cnt < SYMBOLS; cnt++)
	{
		evaluator[0]+=occurrence[cnt].first * 8;	

		if(cnt < 2)
			evaluator[1]+=occurrence[cnt].first * 2;
		else			
			evaluator[1]+=occurrence[cnt].first * 9;

		if(cnt < 4)
			evaluator[2]+=occurrence[cnt].first * 3;
		else			
			evaluator[2]+=occurrence[cnt].first * 9;

		if(cnt < 8)
			evaluator[3]+=occurrence[cnt].first * 4;
		else			
			evaluator[3]+=occurrence[cnt].first * 9;

		if(cnt < 16)
			evaluator[4]+=occurrence[cnt].first * 5;
		else			
			evaluator[4]+=occurrence[cnt].first * 9;
		
		if(cnt < 32)
			evaluator[5]+=occurrence[cnt].first * 6;
		else			
			evaluator[5]+=occurrence[cnt].first * 9;
		
		if(cnt < 64)
			evaluator[6]+=occurrence[cnt].first * 7;
		else			
			evaluator[6]+=occurrence[cnt].first * 9;

		if(cnt < 128)
			evaluator[7]+=occurrence[cnt].first * 8;
		else			
			evaluator[7]+=occurrence[cnt].first * 9;
	}

	TRACE("PacketCompression::evaluateDictionary - end")	
}

void PacketCompression::putBits(string& stream,unsigned int nb_bits,unsigned int data) 
{
	TRACE("PacketCompression::putBits - start")	

	if (nb_bits==0)
		return;

	/* merge the input data and the accumulator */
	accumulator |= (data << shift_count);
	shift_count += nb_bits;
	bit_count += nb_bits;

	/* sends the bit accumulator, byte per byte */
	while (shift_count >= 8) 
	{
		stream += (unsigned char)(accumulator & 0xff);
		TRACE("Buffer size=" << stream.size())
		accumulator >>= 8;
		shift_count -= 8;
	}

	/* remove some eventual garbage */
	accumulator &= (1U << shift_count) - 1;

	TRACE("PacketCompression::putBits - end")	
}

void PacketCompression::flush(string& stream)
{
	TRACE("PacketCompression::flush - start")	
	if(shift_count > 0)
		stream += (char)(accumulator & 0xff);
	TRACE("PacketCompression::flush - end")	
}

unsigned char PacketCompression::computeCheckBit(unsigned char theSchema,unsigned char* theArray)
{
	TRACE("PacketCompression::computeCheckBit - start")	
	unsigned char ret=0;
	unsigned dictlen=(unsigned)pow(2.0,(int)theSchema);

	for(unsigned cnt=0; cnt < dictlen; cnt++)
	{
		unsigned char sym=theArray[cnt];
		unsigned char bit1=(sym & 0x02) >> 1;
		unsigned char bit3=(sym & 0x08) >> 3;
		unsigned char bit5=(sym & 0x20) >> 5;
		unsigned char bit7=(sym & 0x80) >> 7;
		ret^=((bit1 ^ bit3) ^ bit5) ^ bit7;	// b1 xor b2 xor b3 xor b4	

	}	

	ret <<= 7; // move from bit 0 to 7

	ret &= 0x80; // mask 7 bit only


	TRACE("PacketCompression::computeCheckBit - end")
	return ret;	
}

string PacketCompression::deflate(string& theBuffer)
{
	TRACE("PacketCompression::deflate - start")	
	DUMP("Original buffer",(char*)theBuffer.data(), theBuffer.size());
	//TIME_POINT

	
	string ret;

	vector< pair<int,unsigned char> > occurrence;
	occurrence.resize(SYMBOLS);

	TRACE("Init buffer")	
	for(unsigned cnt=0;cnt < SYMBOLS; cnt++)
	{
		occurrence[cnt].first=0;	
		occurrence[cnt].second=cnt;	
	}

	TRACE("Compute symbols occurrences")	
	for(unsigned cnt=0;cnt < theBuffer.size(); cnt++)
		occurrence[(unsigned char)theBuffer[cnt]].first++;	

	TRACE("Quick sort symbols occurrences")	
	MergeSort< vector< pair<int,unsigned char> > >(occurrence.begin(), occurrence.end() -1);

	TRACE("Evaluate dictionary dimension in bits")	
	unsigned evaluator[8];
	evaluateDictionary(occurrence,evaluator);

	TRACE("Find best dictionary dimension")	
	unsigned char schema=0;
	unsigned min=evaluator[0];
	bool useCache=false;

	for(unsigned char indx=1; indx < 8; indx++)
	{
		if(evaluator[indx] < min)
		{
			schema=indx;	
			min=evaluator[indx];
		}
	}

	TRACE("Schema=" << (int)schema)
	percent=(1.0F-(float)evaluator[schema]/(float)evaluator[0])*100.0F;
	TRACE("Compression=" << percent << "%")
	TRACE("Original buffer size=" << theBuffer.size())
	TRACE("Buffer size without compression=" << evaluator[0])
	TRACE("Buffer size with compression=" << min)

	if(schema==0)
	{
		TRACE("No compression")
		ret='0';
		ret+=theBuffer;
	}
	else
	{
		TRACE("Start compression")	

		struct _SymbolTranslation
		{
			bool compressed;
			char symbol;
		} translation[SYMBOLS];

		unsigned dictlen=(unsigned)pow(2.0,(int)schema);
		TRACE("Dictionary length=" << dictlen)
		unsigned buffer_size=evaluator[schema]/8 + ((evaluator[schema]%8==0)? 0:1);
		TRACE("Buffer size=" << buffer_size)
		ret.reserve(buffer_size);		
		
		bool cacheTest=false;
		unsigned char cacheIndex=0;

		if(itsUseCacheMechanism)
		{
			TRACE("Test current cache")
			cacheTest=true;

			for(unsigned cnt=0; cnt < 8; cnt++)
			{			
				if(schema==itsDeflateCacheSchema[cnt])
				{
					cacheTest=true;
	
					for(unsigned cnt1=0; cnt1 < dictlen; cnt1++)
					{
						if(occurrence[cnt1].second!=itsDeflateCacheDictionary[cnt][cnt1])
						{
							cacheTest=false;
							break;
						}
					}
					
					if(cacheTest==true)
					{
						TRACE("Found a valid dictionary at index=" << cnt)
						cacheIndex=cnt;
						break;	
					}
				}
				else
					cacheTest=false;			
			}
		}

		TRACE("Reset translation array")
		for(unsigned cnt=0; cnt < SYMBOLS; cnt++)
		{
			translation[cnt].compressed=false;
			translation[cnt].symbol=0;
		}		

		TRACE("Fill translation array with dictionary infos")
		for(unsigned cnt=0; cnt < dictlen; cnt++)
		{
			unsigned char sym=occurrence[cnt].second;
			translation[sym].compressed=true;
			translation[sym].symbol=cnt;
		}		

		TRACE("Set compression schema")
		stream_length=0;
		shift_count=0;
		accumulator=0;
		bit_count=(1+dictlen)*8;
		
		// !  7  !  6  !  5  !  4  !  3  !  2  !  1  !  0  !

		// +-----+-----+-----+-----+-----+-----+-----+-----+

		// |Check|   Cache index   |Cache|      Schema     |

		// +-----+-----+-----+-----+-----+-----+-----+-----+	

		
		if(cacheTest==false)
		{
			ret=(unsigned char)(schema | ((itsSendPeerReset) ? 0x80 : 0x00) | ((itsDeflateCacheIndex & 0x07) << 4));
			itsSendPeerReset=false;
			TRACE("Add dictionary")
			for(unsigned cnt=0; cnt < dictlen; cnt++)
				ret+=occurrence[cnt].second;	
		}
		else
		{
			ret=(unsigned char)(schema | 0x08 | ((cacheIndex & 0x07) << 4) | itsDeflateCacheCheckBit[cacheIndex]);
			TRACE("Request peer to use cached dictionary")
		}

		TRACE("Create compressed buffer")
		
		for(unsigned cnt=0;cnt < theBuffer.size(); cnt++)
		{
			unsigned char sym=theBuffer[cnt];
			if(translation[sym].compressed==false)
			{
				putBits(ret,1,0); 
				putBits(ret,8,sym); 
			}
			else
			{
				putBits(ret,1,1); 
				putBits(ret,schema,translation[sym].symbol); 
			}
		}
		
		flush(ret);
		TRACE("Total bits in the buffer=" << bit_count)

		if(cacheTest==false)
		{
			TRACE("Save cache")
			itsDeflateCacheSchema[itsDeflateCacheIndex]=schema;

			for(unsigned cnt=0; cnt < dictlen; cnt++)
				itsDeflateCacheDictionary[itsDeflateCacheIndex][cnt]=occurrence[cnt].second; // Save symbol

			
			itsDeflateCacheCheckBit[itsDeflateCacheIndex]=computeCheckBit(schema,&itsDeflateCacheDictionary[itsDeflateCacheIndex][0]);
			itsDeflateCacheIndex= ++itsDeflateCacheIndex % 8;
		}
		else
		{
			percent=(1.0F-(float)ret.size()/(float)theBuffer.size())*100.0F;
		}
	}

	//DISPLAY_ELAPSED

	TRACE("Final buffer length=" << ret.size())
	DUMP("Compressed buffer",(char*)ret.data(), ret.size());
	TRACE("PacketCompression::deflate - end")	
	return ret;
}

unsigned int PacketCompression::getBits(string& stream,unsigned int nb_bits,bool& eof) 
{
	TRACE("PacketCompression::getBits - start")	

	unsigned int result;
	eof=false;

	if (nb_bits==0) 
	    return 0;

	while (shift_count < nb_bits) 
	{
		if(stream_length>=stream.size())
		{
			eof=true;
			TRACE("PacketCompression::getBits - Reached EOF")
			return 0;			
		}
		
		TRACE("Fetched value=" << (int)stream[stream_length] << " '" << (char)stream[stream_length] << "'")
		accumulator |= (unsigned char)stream[stream_length++] << shift_count;
		shift_count += 8;
	}

	result = accumulator & ((1U << nb_bits) - 1);
	accumulator >>= nb_bits;
	shift_count -= nb_bits;
	bit_count += nb_bits;

	TRACE("PacketCompression::getBits - end")	
	return result;
}

string PacketCompression::inflate(string& theBuffer)
{
	TRACE("PacketCompression::inflate - start")
	//TIME_POINT

		
	string ret;
	ret.reserve(theBuffer.size()*2);
	stream_length=0;
	shift_count=0;
	accumulator=0;

	if(theBuffer[0]=='0')
	{
		DUMP("No compressed buffer",(char*)theBuffer.data(), theBuffer.size());
		ret=theBuffer.substr(1,theBuffer.size()-1);		
	}
	else
	{
		DUMP("Compressed buffer",(char*)theBuffer.data(), theBuffer.size());

		// !  7  !  6  !  5  !  4  !  3  !  2  !  1  !  0  !

		// +-----+-----+-----+-----+-----+-----+-----+-----+

		// |Check|   Cache index   |Cache|      Schema     |

		// +-----+-----+-----+-----+-----+-----+-----+-----+


		unsigned schema=theBuffer[0] & 0x07;
		TRACE("Schema=" << schema)	
		unsigned cacheIndex=(theBuffer[0] >> 4) & 0x07;
		TRACE("Cache index=" << cacheIndex)	
		unsigned checkBit=theBuffer[0] & 0x80;
		TRACE("Check bit=" << ((checkBit) ? "Y" : "N"))	
		bool useCache = ((theBuffer[0] & 0x08)? true : false);
		
		if(useCache==false && checkBit!=0)
		{
			WARNING("Cache reset requested from peer")
			reset();	
		}
		
		bool testCache = (schema==itsInflateCacheSchema[cacheIndex]) && (checkBit==itsInflateCacheCheckBit[cacheIndex]);	
		
		if(useCache==true && testCache==false)
		{
			itsSendPeerReset=true;
			reset();
			WARNING("Invalid buffer during inflating. Forcing peer cache to reset.");
			TRACE("PacketCompression::inflate - end with error")	
			return "";	
		}

		useCache= useCache && testCache;
		TRACE("Cache=" << ((useCache) ? "Y" : "N"))	
	
		unsigned dictlen, bufsize;
		
		if(useCache==false)
		{
			dictlen=(unsigned)pow(2.0,(int)schema);
			bufsize=theBuffer.size()-dictlen-1;
			stream_length=1+dictlen;
		}
		else
		{
			dictlen=(unsigned)pow(2.0,(int)itsInflateCacheSchema[cacheIndex]);
			bufsize=theBuffer.size()-1;
			stream_length=1;
		}
		
		bit_count=stream_length*8;
		bool eof=false;
	
		while(!eof)
		{
			unsigned char compressed=(unsigned char)getBits(theBuffer,1,eof);
			if(eof)
				break;
	
			TRACE("Compressed flag=" << (int)compressed)
	
			if(compressed==1)
			{
				unsigned char sym=(unsigned char)getBits(theBuffer,schema,eof);
				if(eof)
					break;
	
				unsigned char dec;
				
				if(useCache==false)
					dec=theBuffer[sym+1];
				else
					dec=itsInflateCacheDictionary[cacheIndex][sym];
				
				TRACE("Symbol=" << (int)sym << " Value=" << (int)dec << " '" << dec << "'")
				ret+=dec; 			
			}
			else
			{
				unsigned char sym=(unsigned char)getBits(theBuffer,8,eof);
				if(eof)
					break;
	
				TRACE("Value=" << (int)sym << " '" << sym << "'") 			
				ret+=sym;			
			}
		}
		
		TRACE("Total bits in the buffer=" << bit_count)
		
		if(useCache==false)
		{
			TRACE("Save cache")
			itsInflateCacheSchema[cacheIndex]=schema;

			for(unsigned cnt=0; cnt < dictlen; cnt++)
				itsInflateCacheDictionary[cacheIndex][cnt]=theBuffer[cnt+1];

			itsInflateCacheCheckBit[cacheIndex]=computeCheckBit(schema,&itsInflateCacheDictionary[cacheIndex][0]);
		}		
	}

	//DISPLAY_ELAPSED			

	TRACE("PacketCompression::inflate - end")	
	return ret;
}


syntax highlighted by Code2HTML, v. 0.9.1