// Copyright (C) 2002 Dominic Letourneau

#include "Vector.h"
#include "BufferedNode.h"
#include "ObjectRef.h"
#include "ObjectParser.h"
#include <sstream>
#include "Stream.h"
#include "SocketStream.h"

using namespace std;

namespace FD {

class BroadcastLoad;
DECLARE_NODE(BroadcastLoad)
/*Node
 *
 * @name BroadcastLoad
 * @category IO
 * @description Load an object from file (registered type)
 *
 * @input_name SOCKET
 * @input_description The stream we are loading from
 * @input_type Stream
 *
 * @output_name OUTPUT
 * @output_description The loaded object
 *
END*/


class BroadcastLoad : public BufferedNode {

protected:
   
  /**The ID of the 'output' output*/
  int outputID;

  /**The ID of the 'stream' input*/
  int socketInputID;
 
public:

  BroadcastLoad(string nodeName, ParameterSet params) 
     : BufferedNode(nodeName, params) {
    
    outputID = addOutput("OUTPUT");
    socketInputID = addInput("SOCKET");
    inOrder = true;
  }
  


  void calculate(int output_id, int count, Buffer &out) {

    

    istream & my_stream = (istream&) object_cast<IStream>(getInput(socketInputID,count));
    
    socket_iostream *my_socket_iostream = dynamic_cast<socket_iostream*>(&my_stream);
    
    if (!my_socket_iostream) {
      throw new GeneralException("Invalid socket",__FILE__,__LINE__);
    }
    
    //getting socket
    socket_streambuf &my_streambuf = (socket_streambuf&) (*my_socket_iostream);


    Vector<ObjectRef> *output_vect = new Vector<ObjectRef>;

    int object_count = 0;

    while(1) {

      //32 kb of data
      unsigned char packet[1024 * 32];
      
      memset(&packet[0],0,1024 * 32);
      
      int size;
      
      size = my_streambuf.recv_packet(&packet[0], 1024 * 32);
      
      if (size > 0) {
	//cerr<<"BROADCAST LOAD SIZE : "<<size<<endl;

	string data((char*) &packet[0], size);

	
	istringstream m_stream(data);

	ObjectRef my_object;      
	
	try {
	  m_stream >> my_object;      
	  output_vect->push_back(my_object);
	  object_count++;
	}
	catch(BaseException *e) {
	  cerr<<"BroadcastLoad (exception)"<<endl;
	  e->print(cerr);
	  delete e;
	}
      }
      else {
	break;
      }
    }  

   
    //cerr<<"BroadcastLoad read : "<<object_count<<" objects!"<<endl;
    
    out[count] = ObjectRef(output_vect);
    
   }

};
}//namespace FD


syntax highlighted by Code2HTML, v. 0.9.1