/* * Copyright (c) 2002-2007 Samit Basu * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * */ #include #include "Array.hpp" #include "Malloc.hpp" #include "HandleList.hpp" #include "RPC.hpp" #include #include "Print.hpp" template void getArray(int N, T* c, QDataStream &in) { for (int i=0;i> c[i]; } template void getSparseArray(int cols, T** c, QDataStream &in) { for (int i=0;i> len; c[i] = new T[(uint32)len]; getArray((int)len,c[i],in); } } template void putArray(int N, const T* c, QDataStream &out) { for (int i=0;i void putSparseArray(int cols, const T** c, QDataStream &out) { for (int i=0;i> t; dclass = (Class) t; in >> t; sparseFlag = (bool) t; in >> dimCount; for (int i=0;i> dimVal; dims.setDimensionLength(i,dimVal); } int elCount(dims.getElementCount()); if (elCount == 0) { dat = Array(dclass,dims,NULL); return; } switch(dclass) { case FM_CELL_ARRAY: { Array *dp = new Array[elCount]; for (int i=0;i> ncount; int i; for (i=0;i> dp; fnames.push_back(dp); delete dp; } Array *dp = new Array[elCount*ncount]; for (i=0;i // cmd.portnum = portnum // -- rpcput(a,cmd) // // To make an rpc call, we could then do // // vars = rpcfeval(a,'cos',pi) // -- cmd.type = 'call' // -- cmd.name = 'cos' // -- cmd.args = {pi} // reply = rpcget(a) // if (~reply.success) error('') // // It could work. The missing piece would be detecting errors on the rpcput side, // since these errors do not appear anywhere right now. // // // It would be far easier (and simpler) to start with a synchronous RPC mechanism // instead of an asynchronous one. This would be something like: // // a = rpcreg(remoteIP, remote port) // G = rpcfeval(a,'cos',pi) // // The way this would work is to block the calling socket until the function completes. // // I could expose the socket interface at the FreeMat level with functions like: // // a = tcpserver(5890); // while (1) // try // g = tcpaccept(a); // cmd = tcpget(g); // try // cmd.type = 'reply' // cmd.args = feval(cmd.name,cmd.args); // catch // cmd.type = 'error'; // cmd.error = lasterror; // end // tcpput(g,cmd); // tcpclose(g); // catch // end // end HandleList m_servers; HandleList m_sockets; //@Module TCPSERVER Start a TCP Server on a designated port //@@Section IO //@@Usage //Sets up a TCP server on a specified port. The syntax for its //use is: //@[ // handle = tcpserver(portnum) //@] //where @|portnum| is the port number to set up the tcp server on. //It returns a @|handle| to the tcpserver. To actually accept //a connection on the server requires a call to @|tcpaccept|. To //close the server down, you need to call @|tcpserverclose|. It is //perfectly acceptable to have multiple @|tcpserver| open simultaneously, //but they must be on different portnumbers. //@@Example //See @|rpcserver| for an example of how to use @|tcpserver|. To //close the server down, you must call @|tcpserverclose|. //The following example works on a single machine, only because of //buffering in the TCP implementation. In practice, the //server and send sockets would be on different machines //@< //server = tcpserver(6010); % Start up the server //send = tcpconnect('127.0.0.1',6010); % Connect to the server just started // % Will succeed because the server is running //recv = tcpaccept(server); % Accept the connection we just tried to make // % Will succeed because of the tcpconnect call //msg = 'Hello'; % Create a message to send through the loop //tcpsend(send,msg); % Push the message through the socket //tcprecv(recv) % Out it comes through the other side //tcpsend(recv,msg); % Sockets are bi-directional //tcprecv(send) //tcpclose(recv); tcpclose(send); % Close the tcp sockets //tcpserverclose(server); % Close the server socket //@> ArrayVector TCPServerFunction(int nargout, const ArrayVector& arg) { if (arg.size() == 0) throw Exception("tcpserver requires one address - the port to set up the server on"); unsigned int port = ArrayToInt32(arg[0]); QTcpServer *server = new QTcpServer; if (!server->listen(QHostAddress::Any,port)) throw Exception("unable to create a tcp server to listen to the given address"); return ArrayVector() << Array::uint32Constructor(m_servers.assignHandle(server)); } //@Module TCPACCEPT Accept a connection on a TCP server //@@Section IO //@@Usage //Accepts a connection on the given @|tcpserver|, and returns a //handle to the connected @|tcpsocket|. This function requires //a timeout (in milliseconds), and will block until either a //connection arrives or the timeout elapses. The syntax for the //command is //@[ // handle = tcpaccept(server_handle) //@] //where @|server_handle| is the handle returned by @|tcpserver|. //The output of @|tcpaccept| can be used with the socket functions //@|tcpsend| and @|tcprecv| to send data between FreeMat instances. //Optionally, you can specify the timeout in milliseconds for the //command to fail //@[ // handle = tcpaccept(server_handle, timeout) //@] //The default timeout is set to 30 seconds. To //close the socket returned by @|tcpaccept| you must call @|tcpclose|. //The resulting handle is identical to one returned by @|tcpconnect|. ArrayVector TCPAcceptFunction(int nargout, const ArrayVector& arg) { if (arg.size() < 1) throw Exception("tcpaccept requires one argument - the handle of the server to read, and an optional timeout to wait before failure (in milliseconds)"); unsigned int server_handle = ArrayToInt32(arg[0]); unsigned int timeout = 30000; if (arg.size() == 2) timeout = ArrayToInt32(arg[1]); QTcpServer *server = m_servers.lookupHandle(server_handle); if (!server->waitForNewConnection(timeout)) throw Exception("Wait for connection in tcpaccept timed out"); QTcpSocket *sock = server->nextPendingConnection(); return ArrayVector() << Array::uint32Constructor(m_sockets.assignHandle(sock)); } //@Module TCPCONNECT Connect to a remote TCP server //@@Section IO //@@Usage //Attempts to open a tcp socket to a remote ip address and portnumber //within a given timeout. The general syntax for its use is //@[ // handle = tcpconnect(remote_address,remote_port,timeout) //@] //where @|timeout| is in milliseconds. The @|remote_address| //must be a string containing either an IP address (e.g., @|'192.168.0.1'|), //or a name (e.g., @|'foo.goo.com'|). The resulting socket can //be closed using @|tcpclose|. If you do not specify a //@|timeout|, then a default of 30 seconds is used. //@@Example //See @|rpceval| for an example of how to use @|tcpconnect|. //The following example works on a single machine, only because of //buffering in the TCP implementation. In practice, the //server and send sockets would be on different machines ArrayVector TCPConnectFunction(int nargout, const ArrayVector& arg) { if (arg.size() < 2) throw Exception("tcpconnect requires two arguments - the remote address of the server to connect to and the port number - an optional timeout can be specified also"); string host = ArrayToString(arg[0]); unsigned int port = ArrayToInt32(arg[1]); int timeout = 30000; if (arg.size() == 3) timeout = ArrayToInt32(arg[2]); QTcpSocket *a_sock = new QTcpSocket; a_sock->connectToHost(QString::fromStdString(host),port); if (!a_sock->waitForConnected(timeout)) throw Exception(string("tcpconnect failed to connect to ") + host + " on port " + port); return ArrayVector() << Array::uint32Constructor(m_sockets.assignHandle(a_sock)); } //@Module TCPCLOSE Close a TCP socket //@@Section IO //@@Usage //Closes a tcp socket that is returned either from @|tcpconnect| //or from @|tcpaccept|. The general syntax for its use is either //@[ // tcpclose(handle) //@] //which closes a specific @|handle| or //@[ // tcpclose all //@] //to close all open sockets. Each close operation has a timeout //associated with it. You can modify the timeout using the following //forms of the command: //@[ // tcpclose(handle,timeout) //@] //where @|timeout| is in milliseconds, and //@[ // tcpclose all timeout //@] //or //@[ // tcpclose('all',timeout) //@] //which are equivalent, and will close all sockets, each with the given timeout. ArrayVector TCPCloseFunction(int nargout, const ArrayVector& arg) { if (arg.size() == 0) throw Exception("tcpclose requires at least one argument - the handle to close, or the string 'all' to close all tcp socket handles"); int timeout = 30000; qDebug() << "Closing socket"; if (arg.size() >= 2) timeout = ArrayToInt32(arg[1]); if (arg[0].isString()) { string txtval = arg[0].getContentsAsStringUpper(); if (txtval != "ALL") throw Exception(string("Unrecognized argument to tcpclose ") + txtval); // Close all sockets for (int i=0;i<=m_sockets.maxHandle();i++) { try { QTcpSocket *sock = m_sockets.lookupHandle(i); if (sock) { sock->disconnectFromHost(); if (sock->state() != QAbstractSocket::UnconnectedState) if (!sock->waitForDisconnected(timeout)) throw Exception(string("Failed to disconnect socket: ") + sock->errorString().toStdString()); delete sock; } m_sockets.deleteHandle(i); } catch (Exception &e) { } } return ArrayVector(); } int handle = ArrayToInt32(arg[0]); QTcpSocket *sock = m_sockets.lookupHandle(handle); sock->disconnectFromHost(); if (sock->state() != QAbstractSocket::UnconnectedState) if (!sock->waitForDisconnected(timeout)) throw Exception(string("Failed to disconnect socket: ") + sock->errorString().toStdString()); delete sock; m_sockets.deleteHandle(handle); return ArrayVector(); } //@Module TCPSERVERCLOSE Close a TCP server socket //@@Section IO //@@Usage //Closes a @|tcpserver| socket. The general syntax //for its use is either //@[ // tcpserverclose(handle) //@] //which closes a specific @|handle| or //@[ // tcpserverclose all //@] //to close all open servers. ArrayVector TCPServerCloseFunction(int nargout, const ArrayVector& arg) { if (arg.size() == 0) throw Exception("tcpserverclose requires at least one argument - the handle to close, or the string 'all' to close all tcp socket handles"); qDebug() << "Closing server"; if (arg[0].isString()) { string txtval = arg[0].getContentsAsStringUpper(); if (txtval != "ALL") throw Exception(string("Unrecognized argument to tcpserverclose ") + txtval); // Close all sockets for (int i=0;i<=m_servers.maxHandle();i++) { try { QTcpServer *sock = m_servers.lookupHandle(i); if (sock) sock->close(); m_servers.deleteHandle(i); } catch (Exception &e) { } } return ArrayVector(); } int handle = ArrayToInt32(arg[0]); QTcpServer *sock = m_servers.lookupHandle(handle); m_servers.deleteHandle(handle); sock->close(); return ArrayVector(); } //@Module TCPSEND Send an array over a TCP socket //@@Section IO //@@Usage //Sends an array over a TCP socket. The encoding of the //array is done in a manner such that arrays can be //transparently sent between different machine types //(endianness, word size, etc.). The general syntax for its //use is //@[ // tcpsend(handle,array) //@] //where @|handle| is a connected socket returned from a //successful @|tcpconnect| or @|tcpaccept| call. By //default the @|tcpsend| operation has a 30 second timeout. //You can specify the timeout using the following syntax for it //@[ // tcpsend(handle,array,timeout) //@] //where @|timeout| is in milliseconds. ArrayVector TCPSendFunction(int nargout, const ArrayVector& arg) { if (arg.size() < 2) throw Exception("tcpsend requires two arguments - the handle of the connection to use, and the array to send - an optional timeout can be specified also"); qDebug() << "Start send"; unsigned int handle = ArrayToInt32(arg[0]); QTcpSocket *sock = m_sockets.lookupHandle(handle); if (sock->state() != QAbstractSocket::ConnectedState) throw Exception("tcpsend only works on connected sockets"); int timeout = 30000; if (arg.size() == 3) timeout = ArrayToInt32(arg[2]); QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_4_2); out << (quint64)0; out << (quint32) 0xFEEDADAD; putArrayToQDS(out,arg[1]); out.device()->seek(0); out << (quint64)(block.size() - sizeof(quint64)); qDebug() << "block size -> " << (quint64)(block.size() - sizeof(quint64)); sock->write(block); // sock->flush(); qDebug() << "send: bytes to write: " << sock->bytesToWrite(); if (!sock->waitForBytesWritten(timeout)) throw Exception("timeout on tcpsend function:" + sock->errorString().toStdString()); qDebug() << "(after) send: bytes to write: " << sock->bytesToWrite(); qDebug() << "Done send"; return ArrayVector(); } //@Module TCPRECV Receive an array over a TCP socket //@@Section IO //@@Usage //Receives an array over from a TCP socket. The encoding of the //array is done in a manner such that arrays can be //transparently sent between different machine types //(endianness, word size, etc.). There must be a matching //@|tcpsend| call for it to work. The general syntax for its //use is //@[ // array = tcprecv(handle) //@] //where @|handle| is a connected socket returned from a //successful @|tcpconnect| or @|tcpaccept| call. By //default the @|tcprecv| operation has a 30 second timeout. //You can specify the timeout using the following syntax for it //@[ // array = tcprecv(handle,timeout) //@] //where @|timeout| is in milliseconds. ArrayVector TCPRecvFunction(int nargout, const ArrayVector& arg) { if (arg.size() < 1) throw Exception("tcprecv requires one argument - the handle of the connection to use - an optional timeout can be specified also."); qDebug() << "Start recv"; unsigned int handle = ArrayToInt32(arg[0]); int timeout = 30000; if (arg.size() == 2) timeout = ArrayToInt32(arg[1]); QTcpSocket *a_sock = m_sockets.lookupHandle(handle); qDebug() << "receive socket state: " << a_sock->state() << " " << a_sock->isValid(); while (a_sock->bytesAvailable() < (int)sizeof(quint64)) { qDebug() << "bytes available = " << a_sock->bytesAvailable(); if (!a_sock->waitForReadyRead(timeout)) { qDebug() << "TIMEOUT: bytes available = " << a_sock->bytesAvailable(); qDebug() << "TIMEOUT: receive socket state: " << a_sock->state() << " " << a_sock->isValid(); throw Exception(string("tcprecv failed to get blocksize:") + a_sock->errorString().toStdString() + " with " + a_sock->bytesAvailable() + " bytes available"); } // sleep(1); } QDataStream in(a_sock); in.setVersion(QDataStream::Qt_4_2); quint64 blockSize; in >> blockSize; qDebug() << "block size = " << blockSize; while (a_sock->bytesAvailable() < blockSize) { if (!a_sock->waitForReadyRead(timeout)) throw Exception(string("tcprecv failed to get data block prior to timeout")); } quint32 magic; in >> magic; if (magic != 0xFEEDADAD) throw Exception(string("tcprecv failed to get proper magic number")); Array ret; getArrayFromQDS(in,ret); qDebug() << "bytes left after get operation: " << a_sock->bytesAvailable(); qDebug() << "Done recv"; return ArrayVector() << ret; } //@Module TCPSTATE State of a TCP socket //@@Section IO //@@Usage //Returns the state of a TCP socket given the handle (returned //either by @|tcpaccept| or @|tcpconnect|. The general syntax //for its use is //@[ // state = tcpstate(handle) //@] //where @|state| is a string that is either: //\begin{itemize} // \item @|'unconnected'| if the socket is unconnected // \item @|'hostlookup'| if the socket is performing a host name lookup // \item @|'connecting'| if the socket has started establishing a connection // \item @|'connected'| if the socket is connected // \item @|'closing'| if the socket is about to close. //\end{itemize} ArrayVector TCPStateFunction(int nargout, const ArrayVector& arg) { if (arg.size() < 1) throw Exception("tcpstate requires one argument - the handle of the socket to examine"); unsigned int handle = ArrayToInt32(arg[0]); QTcpSocket *a_sock = m_sockets.lookupHandle(handle); switch (a_sock->state()) { case QAbstractSocket::UnconnectedState: return ArrayVector() << Array::stringConstructor("unconnected"); case QAbstractSocket::HostLookupState: return ArrayVector() << Array::stringConstructor("hostlookup"); case QAbstractSocket::ConnectingState: return ArrayVector() << Array::stringConstructor("connecting"); case QAbstractSocket::ConnectedState: return ArrayVector() << Array::stringConstructor("connected"); case QAbstractSocket::BoundState: return ArrayVector() << Array::stringConstructor("bound"); case QAbstractSocket::ClosingState: return ArrayVector() << Array::stringConstructor("closing"); case QAbstractSocket::ListeningState: return ArrayVector() << Array::stringConstructor("listening"); } return ArrayVector() << Array::stringConstructor("unknown"); }