/*
* Copyright (c) 2002-2007 Samit Basu
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
*/
#include <QtNetwork>
#include "Array.hpp"
#include "Malloc.hpp"
#include "HandleList.hpp"
#include "RPC.hpp"
#include <QList>
#include "Print.hpp"
template <class T>
void getArray(int N, T* c, QDataStream &in) {
for (int i=0;i<N;i++)
in >> c[i];
}
template <class T>
void getSparseArray(int cols, T** c, QDataStream &in) {
for (int i=0;i<cols;i++) {
T len;
in >> len;
c[i] = new T[(uint32)len];
getArray((int)len,c[i],in);
}
}
template <class T>
void putArray(int N, const T* c, QDataStream &out) {
for (int i=0;i<N;i++)
out << c[i];
}
template <class T>
void putSparseArray(int cols, const T** c, QDataStream &out) {
for (int i=0;i<cols;i++) {
out << 1+c[i][0];
for (int j=0;j<1+c[i][0];j++)
out << c[i][j];
}
}
void getArrayFromQDS(QDataStream &in, Array& dat) {
Class dclass;
bool sparseFlag;
Dimensions dims;
uint8 dimCount;
uint8 t;
in >> t; dclass = (Class) t;
in >> t; sparseFlag = (bool) t;
in >> dimCount;
for (int i=0;i<dimCount;i++) {
uint32 dimVal;
in >> dimVal;
dims.setDimensionLength(i,dimVal);
}
int elCount(dims.getElementCount());
if (elCount == 0) {
dat = Array(dclass,dims,NULL);
return;
}
switch(dclass) {
case FM_CELL_ARRAY: {
Array *dp = new Array[elCount];
for (int i=0;i<elCount;i++)
getArrayFromQDS(in,dp[i]);
dat = Array(dclass,dims,dp);
return;
}
case FM_STRUCT_ARRAY: {
rvstring fnames;
quint32 ncount;
in >> ncount;
int i;
for (i=0;i<ncount;i++) {
char *dp;
in >> dp;
fnames.push_back(dp);
delete dp;
}
Array *dp = new Array[elCount*ncount];
for (i=0;i<elCount*ncount;i++)
getArrayFromQDS(in,dp[i]);
dat = Array(dclass,dims,dp,false,fnames);
return;
}
case FM_LOGICAL: {
logical *dp = (logical*) Malloc(sizeof(logical)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
return;
}
case FM_STRING:
case FM_UINT8: {
uint8 *dp = (uint8*) Malloc(sizeof(uint8)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
return;
}
case FM_INT8: {
int8 *dp = (int8*) Malloc(sizeof(int8)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
return;
}
case FM_UINT16: {
uint16 *dp = (uint16*) Malloc(sizeof(uint16)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
return;
}
case FM_INT16: {
int16 *dp = (int16*) Malloc(sizeof(int16)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
return;
}
case FM_UINT32: {
uint32 *dp = (uint32*) Malloc(sizeof(uint32)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
return;
}
case FM_UINT64: {
uint64 *dp = (uint64*) Malloc(sizeof(uint64)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
return;
}
case FM_INT64: {
uint64 *dp = (uint64*) Malloc(sizeof(uint64)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
return;
}
case FM_INT32: {
if (!sparseFlag) {
int32 *dp = (int32*) Malloc(sizeof(int32)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
} else {
int32 **dp = new int32*[dims.getColumns()];
getSparseArray(dims.getColumns(), dp, in);
dat = Array(dclass,dims,dp,true);
}
return;
}
case FM_FLOAT: {
if (!sparseFlag) {
float *dp = (float*) Malloc(sizeof(float)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
} else {
float **dp = new float*[dims.getColumns()];
getSparseArray(dims.getColumns(), dp, in);
dat = Array(dclass,dims,dp,true);
}
return;
}
case FM_DOUBLE: {
if (!sparseFlag) {
double *dp = (double*) Malloc(sizeof(double)*elCount);
getArray(elCount, dp, in);
dat = Array(dclass,dims,dp);
} else {
double **dp = new double*[dims.getColumns()];
getSparseArray(dims.getColumns(), dp, in);
dat = Array(dclass,dims,dp,true);
}
return;
}
case FM_COMPLEX: {
if (!sparseFlag) {
float *dp = (float*) Malloc(sizeof(float)*elCount*2);
getArray(elCount*2, dp, in);
dat = Array(dclass,dims,dp);
} else {
float **dp = new float*[dims.getColumns()];
getSparseArray(dims.getColumns(), dp, in);
dat = Array(dclass,dims,dp,true);
}
return;
}
case FM_DCOMPLEX: {
if (!sparseFlag) {
double *dp = (double*) Malloc(sizeof(double)*elCount*2);
getArray(elCount*2, dp, in);
dat = Array(dclass,dims,dp);
} else {
double **dp = new double*[dims.getColumns()];
getSparseArray(dims.getColumns(), dp, in);
dat = Array(dclass,dims,dp,true);
}
return;
}
}
}
void putArrayToQDS(QDataStream &out, const Array& dat) {
out << (uint8) dat.dataClass();
out << (uint8) dat.sparse();
out << (uint8) dat.dimensions().getLength();
for (int i=0;i<dat.dimensions().getLength();i++)
out << (uint32) dat.dimensions().getDimensionLength(i);
int elCount(dat.getLength());
if (dat.isEmpty()) return;
switch(dat.dataClass()) {
case FM_CELL_ARRAY: {
const Array *dp=((const Array *) dat.getDataPointer());
for (int i=0;i<elCount;i++)
putArrayToQDS(out,dp[i]);
return;
}
case FM_STRUCT_ARRAY: {
rvstring fnames(dat.fieldNames());
int ncount(fnames.size());
out << (quint32) ncount;
int i;
for (i=0;i<ncount;i++)
out << fnames.at(i).c_str();
const Array *dp=((const Array *) dat.getDataPointer());
for (i=0;i<elCount*ncount;i++)
putArrayToQDS(out,dp[i]);
return;
}
case FM_LOGICAL:
putArray(elCount,(const logical *)dat.getDataPointer(),out);
return;
case FM_STRING:
case FM_UINT8:
putArray(elCount,(const uint8 *)dat.getDataPointer(),out);
return;
case FM_UINT16:
putArray(elCount,(const uint16 *)dat.getDataPointer(),out);
return;
case FM_UINT32:
putArray(elCount, (const uint32 *)dat.getDataPointer(),out);
return;
case FM_UINT64:
putArray(elCount, (const uint64 *)dat.getDataPointer(),out);
return;
case FM_INT8:
putArray(elCount, (const int8 *)dat.getDataPointer(),out);
return;
case FM_INT16:
putArray(elCount, (const int16 *)dat.getDataPointer(),out);
return;
case FM_INT32:
if (!dat.sparse())
putArray(elCount, (const int32 *)dat.getDataPointer(),out);
else
putSparseArray(dat.getDimensionLength(1), (const int32 **) dat.getSparseDataPointer(), out);
return;
case FM_INT64:
putArray(elCount, (const int64 *)dat.getDataPointer(), out);
return;
case FM_FLOAT:
if (!dat.sparse())
putArray(elCount, (const float *)dat.getDataPointer(), out);
else
putSparseArray(dat.getDimensionLength(1), (const float **) dat.getSparseDataPointer(), out);
return;
case FM_DOUBLE:
if (!dat.sparse())
putArray(elCount, (const double *)dat.getDataPointer(), out);
else
putSparseArray(dat.getDimensionLength(1), (const double **) dat.getSparseDataPointer(), out);
return;
case FM_COMPLEX:
if (!dat.sparse())
putArray(elCount*2, (const float *)dat.getDataPointer(), out);
else
putSparseArray(dat.getDimensionLength(1), (const float **) dat.getSparseDataPointer(), out);
return;
case FM_DCOMPLEX:
if (!dat.sparse())
putArray(elCount*2, (const double *)dat.getDataPointer(), out);
else
putSparseArray(dat.getDimensionLength(1), (const double **) dat.getSparseDataPointer(), out);
return;
}
}
// Here is the idea behind FreeMat's RPC service.
//
// 1. It is enabled and controlled via functions
// 2. It acts as an independant entity - driven by the main event loop
// 3. It supports simple Get/Put operations for arrays.
// 4. It is more of a peer-to-peer service than a centralized one
//
// Here is the mock-up:
//
// rpccontrol enable on queue 10 port 3254 % turn on our RPC service
// % Allow a queue depth of 10
// % set our RPC port to 3254
// recon = rpcreg('192.168.0.100:2950') % returns an integer ID for the remote process
// rpcping(recon) % query the state of the remote RPC service
// rpcput(recon,A) % throws an exception if the put fails
// cnt = rpcpeek(recon) % how many messages are available from recon?
// A = rpcget(recon) % retrieve next message from recon
//
// With these commands, a simple RPC server looks like this:
// id = rpcinit(5237); % Open an RPC server on port 5237
// [A,id] = rpcget(0) % Get the next message (from anyone)
// [var{:}] = feval(A.funcname,A.data); % Evaluate the function (whatever it is)
// A.data = var; % replace the arguments with the reply
// rpcput(id,A); % Send it back.
//
// A slot is a place to store a received array, or the fact than an error occured
// trying to receive/decode it
//
// The current design has flaws... In particular... Suppose we issue 2 gets to
// a remote client in sequence:
// rpcput(id,var1)
// rpcput(id,var2)
//
// Because of the lack of a queue structure, there is no guarantee that var1 will complete
// (and thus free up the server) before we attempt to put var2.
//
// The right way to handle this is to have a send and receive queue associated with each
// rpc server. Also, how will a server return a message? The current "push" model assumes
// that a uni-directional push is adequate to implement all desired features.
//
// One option is to have two global queues "out" and "in". We could also have a thread
// for each RPC server. For some reason, I don't like that idea.
//
// OK - so what is the problem with the current model? It will push data to as many different
// slots as possible concurrently. Actually - that shouls be fine - the example I listed
// above is wrong. If we attempt to push var1 and var2 to the same server, they will
// succeed and show up in that server's queue as successive entries - precisely because
// that server will spawn RPCClients to manage the connections
//
// Then in principle it is fine... If you want to call an RPC server, you have to start your
// own RPC service first. It would be nice if you could start multiple services on different
// ports. That can be done easily enough. The problem with the current system, then, is that
// the "get-s" are not working.
//
// OK - as originally envisioned - the put/get mechanism appears to work... But it leaves
// somethings to be desired. First of all, it means we have to have our own rpc server running
// (and get the remote server to register us) before the remote server can push an answer to us.
// That means publishing an address where the remote server can reach us. Something like this:
//
// a = rpcdial(remoteaddress,portnum)
// -- rpcinit(portnum)
// -- a = rpcreg(remoteaddress,portnum)
// cmd.type = 'register'
// cmd.address = <our ip address>
// cmd.portnum = portnum
// -- rpcput(a,cmd)
//
// To make an rpc call, we could then do
//
// vars = rpcfeval(a,'cos',pi)
// -- cmd.type = 'call'
// -- cmd.name = 'cos'
// -- cmd.args = {pi}
// reply = rpcget(a)
// if (~reply.success) error('')
//
// It could work. The missing piece would be detecting errors on the rpcput side,
// since these errors do not appear anywhere right now.
//
//
// It would be far easier (and simpler) to start with a synchronous RPC mechanism
// instead of an asynchronous one. This would be something like:
//
// a = rpcreg(remoteIP, remote port)
// G = rpcfeval(a,'cos',pi)
//
// The way this would work is to block the calling socket until the function completes.
//
// I could expose the socket interface at the FreeMat level with functions like:
//
// a = tcpserver(5890);
// while (1)
// try
// g = tcpaccept(a);
// cmd = tcpget(g);
// try
// cmd.type = 'reply'
// cmd.args = feval(cmd.name,cmd.args);
// catch
// cmd.type = 'error';
// cmd.error = lasterror;
// end
// tcpput(g,cmd);
// tcpclose(g);
// catch
// end
// end
HandleList<QTcpServer*> m_servers;
HandleList<QTcpSocket*> m_sockets;
//@Module TCPSERVER Start a TCP Server on a designated port
//@@Section IO
//@@Usage
//Sets up a TCP server on a specified port. The syntax for its
//use is:
//@[
// handle = tcpserver(portnum)
//@]
//where @|portnum| is the port number to set up the tcp server on.
//It returns a @|handle| to the tcpserver. To actually accept
//a connection on the server requires a call to @|tcpaccept|. To
//close the server down, you need to call @|tcpserverclose|. It is
//perfectly acceptable to have multiple @|tcpserver| open simultaneously,
//but they must be on different portnumbers.
//@@Example
//See @|rpcserver| for an example of how to use @|tcpserver|. To
//close the server down, you must call @|tcpserverclose|.
//The following example works on a single machine, only because of
//buffering in the TCP implementation. In practice, the
//server and send sockets would be on different machines
//@<
//server = tcpserver(6010); % Start up the server
//send = tcpconnect('127.0.0.1',6010); % Connect to the server just started
// % Will succeed because the server is running
//recv = tcpaccept(server); % Accept the connection we just tried to make
// % Will succeed because of the tcpconnect call
//msg = 'Hello'; % Create a message to send through the loop
//tcpsend(send,msg); % Push the message through the socket
//tcprecv(recv) % Out it comes through the other side
//tcpsend(recv,msg); % Sockets are bi-directional
//tcprecv(send)
//tcpclose(recv); tcpclose(send); % Close the tcp sockets
//tcpserverclose(server); % Close the server socket
//@>
ArrayVector TCPServerFunction(int nargout, const ArrayVector& arg) {
if (arg.size() == 0)
throw Exception("tcpserver requires one address - the port to set up the server on");
unsigned int port = ArrayToInt32(arg[0]);
QTcpServer *server = new QTcpServer;
if (!server->listen(QHostAddress::Any,port))
throw Exception("unable to create a tcp server to listen to the given address");
return ArrayVector() <<
Array::uint32Constructor(m_servers.assignHandle(server));
}
//@Module TCPACCEPT Accept a connection on a TCP server
//@@Section IO
//@@Usage
//Accepts a connection on the given @|tcpserver|, and returns a
//handle to the connected @|tcpsocket|. This function requires
//a timeout (in milliseconds), and will block until either a
//connection arrives or the timeout elapses. The syntax for the
//command is
//@[
// handle = tcpaccept(server_handle)
//@]
//where @|server_handle| is the handle returned by @|tcpserver|.
//The output of @|tcpaccept| can be used with the socket functions
//@|tcpsend| and @|tcprecv| to send data between FreeMat instances.
//Optionally, you can specify the timeout in milliseconds for the
//command to fail
//@[
// handle = tcpaccept(server_handle, timeout)
//@]
//The default timeout is set to 30 seconds. To
//close the socket returned by @|tcpaccept| you must call @|tcpclose|.
//The resulting handle is identical to one returned by @|tcpconnect|.
ArrayVector TCPAcceptFunction(int nargout, const ArrayVector& arg) {
if (arg.size() < 1)
throw Exception("tcpaccept requires one argument - the handle of the server to read, and an optional timeout to wait before failure (in milliseconds)");
unsigned int server_handle = ArrayToInt32(arg[0]);
unsigned int timeout = 30000;
if (arg.size() == 2)
timeout = ArrayToInt32(arg[1]);
QTcpServer *server = m_servers.lookupHandle(server_handle);
if (!server->waitForNewConnection(timeout))
throw Exception("Wait for connection in tcpaccept timed out");
QTcpSocket *sock = server->nextPendingConnection();
return ArrayVector() <<
Array::uint32Constructor(m_sockets.assignHandle(sock));
}
//@Module TCPCONNECT Connect to a remote TCP server
//@@Section IO
//@@Usage
//Attempts to open a tcp socket to a remote ip address and portnumber
//within a given timeout. The general syntax for its use is
//@[
// handle = tcpconnect(remote_address,remote_port,timeout)
//@]
//where @|timeout| is in milliseconds. The @|remote_address|
//must be a string containing either an IP address (e.g., @|'192.168.0.1'|),
//or a name (e.g., @|'foo.goo.com'|). The resulting socket can
//be closed using @|tcpclose|. If you do not specify a
//@|timeout|, then a default of 30 seconds is used.
//@@Example
//See @|rpceval| for an example of how to use @|tcpconnect|.
//The following example works on a single machine, only because of
//buffering in the TCP implementation. In practice, the
//server and send sockets would be on different machines
ArrayVector TCPConnectFunction(int nargout, const ArrayVector& arg) {
if (arg.size() < 2)
throw Exception("tcpconnect requires two arguments - the remote address of the server to connect to and the port number - an optional timeout can be specified also");
string host = ArrayToString(arg[0]);
unsigned int port = ArrayToInt32(arg[1]);
int timeout = 30000;
if (arg.size() == 3)
timeout = ArrayToInt32(arg[2]);
QTcpSocket *a_sock = new QTcpSocket;
a_sock->connectToHost(QString::fromStdString(host),port);
if (!a_sock->waitForConnected(timeout))
throw Exception(string("tcpconnect failed to connect to ") + host + " on port " + port);
return ArrayVector() <<
Array::uint32Constructor(m_sockets.assignHandle(a_sock));
}
//@Module TCPCLOSE Close a TCP socket
//@@Section IO
//@@Usage
//Closes a tcp socket that is returned either from @|tcpconnect|
//or from @|tcpaccept|. The general syntax for its use is either
//@[
// tcpclose(handle)
//@]
//which closes a specific @|handle| or
//@[
// tcpclose all
//@]
//to close all open sockets. Each close operation has a timeout
//associated with it. You can modify the timeout using the following
//forms of the command:
//@[
// tcpclose(handle,timeout)
//@]
//where @|timeout| is in milliseconds, and
//@[
// tcpclose all timeout
//@]
//or
//@[
// tcpclose('all',timeout)
//@]
//which are equivalent, and will close all sockets, each with the given timeout.
ArrayVector TCPCloseFunction(int nargout, const ArrayVector& arg) {
if (arg.size() == 0)
throw Exception("tcpclose requires at least one argument - the handle to close, or the string 'all' to close all tcp socket handles");
int timeout = 30000;
qDebug() << "Closing socket";
if (arg.size() >= 2)
timeout = ArrayToInt32(arg[1]);
if (arg[0].isString()) {
string txtval = arg[0].getContentsAsStringUpper();
if (txtval != "ALL")
throw Exception(string("Unrecognized argument to tcpclose ") + txtval);
// Close all sockets
for (int i=0;i<=m_sockets.maxHandle();i++) {
try {
QTcpSocket *sock = m_sockets.lookupHandle(i);
if (sock) {
sock->disconnectFromHost();
if (sock->state() != QAbstractSocket::UnconnectedState)
if (!sock->waitForDisconnected(timeout))
throw Exception(string("Failed to disconnect socket: ") + sock->errorString().toStdString());
delete sock;
}
m_sockets.deleteHandle(i);
} catch (Exception &e) {
}
}
return ArrayVector();
}
int handle = ArrayToInt32(arg[0]);
QTcpSocket *sock = m_sockets.lookupHandle(handle);
sock->disconnectFromHost();
if (sock->state() != QAbstractSocket::UnconnectedState)
if (!sock->waitForDisconnected(timeout))
throw Exception(string("Failed to disconnect socket: ") + sock->errorString().toStdString());
delete sock;
m_sockets.deleteHandle(handle);
return ArrayVector();
}
//@Module TCPSERVERCLOSE Close a TCP server socket
//@@Section IO
//@@Usage
//Closes a @|tcpserver| socket. The general syntax
//for its use is either
//@[
// tcpserverclose(handle)
//@]
//which closes a specific @|handle| or
//@[
// tcpserverclose all
//@]
//to close all open servers.
ArrayVector TCPServerCloseFunction(int nargout, const ArrayVector& arg) {
if (arg.size() == 0)
throw Exception("tcpserverclose requires at least one argument - the handle to close, or the string 'all' to close all tcp socket handles");
qDebug() << "Closing server";
if (arg[0].isString()) {
string txtval = arg[0].getContentsAsStringUpper();
if (txtval != "ALL")
throw Exception(string("Unrecognized argument to tcpserverclose ") + txtval);
// Close all sockets
for (int i=0;i<=m_servers.maxHandle();i++) {
try {
QTcpServer *sock = m_servers.lookupHandle(i);
if (sock)
sock->close();
m_servers.deleteHandle(i);
} catch (Exception &e) {
}
}
return ArrayVector();
}
int handle = ArrayToInt32(arg[0]);
QTcpServer *sock = m_servers.lookupHandle(handle);
m_servers.deleteHandle(handle);
sock->close();
return ArrayVector();
}
//@Module TCPSEND Send an array over a TCP socket
//@@Section IO
//@@Usage
//Sends an array over a TCP socket. The encoding of the
//array is done in a manner such that arrays can be
//transparently sent between different machine types
//(endianness, word size, etc.). The general syntax for its
//use is
//@[
// tcpsend(handle,array)
//@]
//where @|handle| is a connected socket returned from a
//successful @|tcpconnect| or @|tcpaccept| call. By
//default the @|tcpsend| operation has a 30 second timeout.
//You can specify the timeout using the following syntax for it
//@[
// tcpsend(handle,array,timeout)
//@]
//where @|timeout| is in milliseconds.
ArrayVector TCPSendFunction(int nargout, const ArrayVector& arg) {
if (arg.size() < 2)
throw Exception("tcpsend requires two arguments - the handle of the connection to use, and the array to send - an optional timeout can be specified also");
qDebug() << "Start send";
unsigned int handle = ArrayToInt32(arg[0]);
QTcpSocket *sock = m_sockets.lookupHandle(handle);
if (sock->state() != QAbstractSocket::ConnectedState)
throw Exception("tcpsend only works on connected sockets");
int timeout = 30000;
if (arg.size() == 3)
timeout = ArrayToInt32(arg[2]);
QByteArray block;
QDataStream out(&block, QIODevice::WriteOnly);
out.setVersion(QDataStream::Qt_4_2);
out << (quint64)0;
out << (quint32) 0xFEEDADAD;
putArrayToQDS(out,arg[1]);
out.device()->seek(0);
out << (quint64)(block.size() - sizeof(quint64));
qDebug() << "block size -> " << (quint64)(block.size() - sizeof(quint64));
sock->write(block);
// sock->flush();
qDebug() << "send: bytes to write: " << sock->bytesToWrite();
if (!sock->waitForBytesWritten(timeout))
throw Exception("timeout on tcpsend function:" + sock->errorString().toStdString());
qDebug() << "(after) send: bytes to write: " << sock->bytesToWrite();
qDebug() << "Done send";
return ArrayVector();
}
//@Module TCPRECV Receive an array over a TCP socket
//@@Section IO
//@@Usage
//Receives an array over from a TCP socket. The encoding of the
//array is done in a manner such that arrays can be
//transparently sent between different machine types
//(endianness, word size, etc.). There must be a matching
//@|tcpsend| call for it to work. The general syntax for its
//use is
//@[
// array = tcprecv(handle)
//@]
//where @|handle| is a connected socket returned from a
//successful @|tcpconnect| or @|tcpaccept| call. By
//default the @|tcprecv| operation has a 30 second timeout.
//You can specify the timeout using the following syntax for it
//@[
// array = tcprecv(handle,timeout)
//@]
//where @|timeout| is in milliseconds.
ArrayVector TCPRecvFunction(int nargout, const ArrayVector& arg) {
if (arg.size() < 1)
throw Exception("tcprecv requires one argument - the handle of the connection to use - an optional timeout can be specified also.");
qDebug() << "Start recv";
unsigned int handle = ArrayToInt32(arg[0]);
int timeout = 30000;
if (arg.size() == 2)
timeout = ArrayToInt32(arg[1]);
QTcpSocket *a_sock = m_sockets.lookupHandle(handle);
qDebug() << "receive socket state: " << a_sock->state() << " " << a_sock->isValid();
while (a_sock->bytesAvailable() < (int)sizeof(quint64)) {
qDebug() << "bytes available = " << a_sock->bytesAvailable();
if (!a_sock->waitForReadyRead(timeout)) {
qDebug() << "TIMEOUT: bytes available = " << a_sock->bytesAvailable();
qDebug() << "TIMEOUT: receive socket state: " << a_sock->state() << " " << a_sock->isValid();
throw Exception(string("tcprecv failed to get blocksize:") + a_sock->errorString().toStdString() + " with " + a_sock->bytesAvailable() + " bytes available");
}
// sleep(1);
}
QDataStream in(a_sock);
in.setVersion(QDataStream::Qt_4_2);
quint64 blockSize;
in >> blockSize;
qDebug() << "block size = " << blockSize;
while (a_sock->bytesAvailable() < blockSize) {
if (!a_sock->waitForReadyRead(timeout))
throw Exception(string("tcprecv failed to get data block prior to timeout"));
}
quint32 magic;
in >> magic;
if (magic != 0xFEEDADAD)
throw Exception(string("tcprecv failed to get proper magic number"));
Array ret;
getArrayFromQDS(in,ret);
qDebug() << "bytes left after get operation: " << a_sock->bytesAvailable();
qDebug() << "Done recv";
return ArrayVector() << ret;
}
//@Module TCPSTATE State of a TCP socket
//@@Section IO
//@@Usage
//Returns the state of a TCP socket given the handle (returned
//either by @|tcpaccept| or @|tcpconnect|. The general syntax
//for its use is
//@[
// state = tcpstate(handle)
//@]
//where @|state| is a string that is either:
//\begin{itemize}
// \item @|'unconnected'| if the socket is unconnected
// \item @|'hostlookup'| if the socket is performing a host name lookup
// \item @|'connecting'| if the socket has started establishing a connection
// \item @|'connected'| if the socket is connected
// \item @|'closing'| if the socket is about to close.
//\end{itemize}
ArrayVector TCPStateFunction(int nargout, const ArrayVector& arg) {
if (arg.size() < 1)
throw Exception("tcpstate requires one argument - the handle of the socket to examine");
unsigned int handle = ArrayToInt32(arg[0]);
QTcpSocket *a_sock = m_sockets.lookupHandle(handle);
switch (a_sock->state()) {
case QAbstractSocket::UnconnectedState:
return ArrayVector() << Array::stringConstructor("unconnected");
case QAbstractSocket::HostLookupState:
return ArrayVector() << Array::stringConstructor("hostlookup");
case QAbstractSocket::ConnectingState:
return ArrayVector() << Array::stringConstructor("connecting");
case QAbstractSocket::ConnectedState:
return ArrayVector() << Array::stringConstructor("connected");
case QAbstractSocket::BoundState:
return ArrayVector() << Array::stringConstructor("bound");
case QAbstractSocket::ClosingState:
return ArrayVector() << Array::stringConstructor("closing");
case QAbstractSocket::ListeningState:
return ArrayVector() << Array::stringConstructor("listening");
}
return ArrayVector() << Array::stringConstructor("unknown");
}
syntax highlighted by Code2HTML, v. 0.9.1