/*
* Copyright (C) 2001, Shilad Sen, Sourcelight Technologies, Inc.
* See xmlrpc.h or the README for more copyright information.
*/
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <sys/types.h>
#include "xmlrpc.h"
#include "rpcInternal.h"
#ifdef MSWINDOWS
#include <winsock2.h>
#else
#include <netdb.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <unistd.h>
#endif
#define READ_SIZE 4096
#define STATE_CONNECT 0
#define STATE_CONNECTING 1
#define STATE_WRITE 2
#define STATE_READ_HEADER 3
#define STATE_READ_BODY 4
#define STATE_READ_CHUNK 5
#define RETURN_ERR 0
#define RETURN_AGAIN 1
#define RETURN_DONE 2
#define RETURN_READ_MORE 3 /* only for chunked encoding */
static rpcClient *rpcClientNewFromDisp(
char *host,
int port,
char *url,
rpcDisp *disp
);
static bool connecting(rpcClient *cp);
static bool writeRequest(rpcClient *cp, PyObject **toWritep);
static bool readResponse(
rpcClient *cp,
PyObject **bodyp,
long blen
);
static bool readHeader(
rpcClient *cp,
PyObject **headp,
PyObject **bodyp,
long *blen,
bool *chunked
);
static bool nbRead(int fd, PyObject **buffpp, bool *eof);
static bool executed(rpcClient *cp, PyObject *resp, PyObject *arg);
static PyObject *pyRpcClientExecute(PyObject *self, PyObject *args);
static PyObject *pyRpcClientGetAttr(rpcClient *cp, char *name);
static PyObject *pyRpcClientWork(PyObject *self, PyObject *args);
static bool clientConnect(rpcClient *cp);
static bool cleanAndRetFalse(PyObject *listp);
static bool pyClientCallback(
rpcClient *cp,
PyObject *resp,
PyObject *extArgs
);
static bool execDispatch(
rpcDisp *dp,
rpcSource *sp,
int actions,
PyObject *args
);
static bool addAuthentication(
PyObject *addInfo,
char *name,
char *pass
);
static bool readChunks(
rpcClient *client,
PyObject **bodyp,
PyObject **chunkp
);
static int processChunk(
rpcClient *client,
PyObject **bodyp,
PyObject **chunkp
);
rpcClient *
rpcClientNew(char *host, int port, char *url)
{
rpcDisp *dp;
rpcClient *cp;
dp = rpcDispNew();
if (dp == NULL)
return NULL;
cp = rpcClientNewFromDisp(host, port, url, dp);
Py_DECREF(dp);
return cp;
}
rpcClient *
rpcClientNewFromServer(char *host, int port, char *url, rpcServer *servp)
{
return rpcClientNewFromDisp(host, port, url, servp->disp);
}
static rpcClient *
rpcClientNewFromDisp(char *host, int port, char *url, rpcDisp *disp)
{
rpcClient *cp;
rpcSource *sp;
int slen;
cp = PyObject_NEW(rpcClient, &rpcClientType);
if (cp == NULL)
return NULL;
cp->host = alloc(strlen(host) + 1);
if (cp->host == NULL)
return NULL;
strcpy(cp->host, host);
cp->url = alloc(strlen(url) + 1);
if (cp->url == NULL)
return NULL;
strcpy(cp->url, url);
cp->port = port;
cp->disp = disp;
cp->execing = false;
Py_INCREF(disp);
sp = rpcSourceNew(-1);
if (sp == NULL)
return NULL;
sp->doClose = true;
slen = strlen(host) + strlen(":123456") + 1; /* max desc length */
sp->desc = alloc(slen);
if (sp->desc == NULL)
return false;
if (port == 80)
snprintf(sp->desc, slen, "%s", host);
else
snprintf(sp->desc, slen, "%s:%i", host, port);
sp->desc[slen - 1] = EOS;
cp->src = sp;
return cp;
}
void
rpcClientClose(rpcClient *cp)
{
if (cp->src->fd >= 0)
close(cp->src->fd);
cp->src->fd = -1; /* make sure we don't use the fd again */
}
void
rpcClientDealloc(rpcClient *cp)
{
if (cp->host)
free(cp->host);
if (cp->url)
free(cp->url);
rpcClientClose(cp);
cp->host = NULL;
cp->url = NULL;
Py_DECREF(cp->src);
Py_DECREF(cp->disp);
PyMem_DEL(cp);
}
static bool
execDispatch(rpcDisp *dp, rpcSource *sp, int actions, PyObject *params)
{
bool (*cfunc) (rpcClient *, PyObject *, PyObject *),
res;
PyObject *head,
*body,
*chunk,
*args,
*nargs,
*strReq,
*pyfunc, /* python string that holds func ptr */
*cleanup, /* list of items to be DECREF'd */
*funcArgs; /* args to the callback function */
rpcClient *cp;
int state,
nacts,
nstate,
r;
long blen;
bool chunked;
nargs = NULL; /* to appease the compiler */
r = -1;
cleanup = PyList_New(0);
if (cleanup == NULL)
return false;
unless (PyArg_ParseTuple(params, "OiSOO:execDispatch",
&cp, &state, &pyfunc, &funcArgs, &args))
return false;
assert(cp->ob_type == &rpcClientType);
assert(cp->execing == true);
switch (state) {
case STATE_CONNECT: /* args is request, but not touched */
if (cp->src->fd < 0 and not clientConnect(cp)) {
cp->execing = false;
return cleanAndRetFalse(cleanup);
}
nstate = STATE_CONNECTING;
nacts = ACT_OUTPUT;
nargs = args;
break;
case STATE_CONNECTING:
r = connecting(cp);
if (r == RETURN_ERR) {
cp->execing = false;
return cleanAndRetFalse(cleanup);
} else if (r == RETURN_AGAIN) {
nstate = STATE_CONNECTING;
nacts = ACT_OUTPUT;
nargs = args;
break;
}
assert(r == RETURN_DONE);
/* windows sucks. you can't trust SOL_ERROR being set to 0 *
* meaning it is connected but select on write is ok */
nstate = STATE_WRITE;
nacts = ACT_OUTPUT;
nargs = args;
break;
case STATE_WRITE: /* args is toWriteStr */
r = writeRequest(cp, &args);
if (r == RETURN_ERR) {
cp->execing = false;
return cleanAndRetFalse(cleanup);
} else if (r == RETURN_AGAIN) {
nstate = STATE_WRITE;
nacts = ACT_OUTPUT;
nargs = args;
if (PyList_Append(cleanup, args)) {
cp->execing = false;
return false;
}
break;
}
assert (r == RETURN_DONE);
args = PyString_FromString("");
if ((args == NULL)
or (PyList_Append(cleanup, args))) {
cp->execing = false;
return false;
}
case STATE_READ_HEADER: /* args is buff */
head = args;
body = NULL;
r = readHeader(cp, &head, &body, &blen, &chunked);
if (r == RETURN_ERR) {
cp->execing = false;
return cleanAndRetFalse(cleanup);
} else if (r == RETURN_AGAIN) {
nstate = STATE_READ_HEADER;
nacts = ACT_INPUT;
nargs = head;
if (PyList_Append(cleanup, head)) {
cp->execing = false;
return false;
}
break;
}
assert (r == RETURN_DONE);
/* If we are reading a chunked response, the body becomes *
* the first chunk to be parsed and the new body is empty */
if (chunked)
args = Py_BuildValue("(S,s,i,S,i)",
head, "", blen, body, 1);
else
args = Py_BuildValue("(S,S,i,s,i)",
head, body, blen, "", 0);
if ((args == NULL)
or (PyList_Append(cleanup, args))) {
cp->execing = false;
return cleanAndRetFalse(cleanup);
}
Py_DECREF(head);
Py_DECREF(body);
case STATE_READ_BODY: /* args is (head, body, blen, chunk, chunked) */
unless (PyArg_ParseTuple(args, "SSlSi:execDispatchReadBody",
&head, &body, &blen,
&chunk, &chunked)) {
cp->execing = false;
return cleanAndRetFalse(cleanup);
}
if (chunked)
r = readChunks(cp, &body, &chunk);
else
r = readResponse(cp, &body, blen);
if (r == RETURN_ERR) {
cp->execing = false;
return cleanAndRetFalse(cleanup);
} else if (r == RETURN_AGAIN) {
nstate = STATE_READ_BODY;
nacts = ACT_INPUT;
nargs = Py_BuildValue("(O,O,i,O,i)", head, body, blen, chunk, chunked);
Py_DECREF(body);
if (chunked) {
Py_DECREF(chunk);
}
if (nargs == NULL) {
cp->execing = false;
return cleanAndRetFalse(cleanup);
}
if (PyList_Append(cleanup, nargs)) {
cp->execing = false;
return false;
}
break;
}
if (chunked) {
Py_DECREF(chunk); /* we no longer need this */
}
cp->execing = false;
assert (r == RETURN_DONE);
Py_INCREF(head); /* hack so concat doesn't fail */
PyString_Concat(&head, body);
Py_DECREF(body);
if (head == NULL)
return false;
if (rpcLogLevel >= 9) {
strReq = PyObject_Repr(head);
if (strReq == NULL)
return false;
rpcLogSrc(9, cp->src, "server response is %s",
PyString_AS_STRING(strReq));
Py_DECREF(strReq);
}
memcpy(&cfunc, PyString_AS_STRING(pyfunc), sizeof(cfunc));
res = cfunc(cp, head, funcArgs);
(void)cleanAndRetFalse(cleanup);
unless (doKeepAlive(head, TYPE_RESP))
rpcClientClose(cp);
Py_DECREF(head);
return res;
default:
PyErr_SetString(rpcError, "unknown state to execDispatch");
return cleanAndRetFalse(cleanup);
}
sp->actImp = nacts;
sp->func = execDispatch;
sp->params = Py_BuildValue("(O,i,O,O,O)",
cp, nstate, pyfunc, funcArgs, nargs);
(void)cleanAndRetFalse(cleanup);
if (sp->params == NULL)
return false;
unless (rpcDispAddSource(dp, sp))
return false;
return true;
}
/*
* Doubly DECREF a list and return false
*/
bool cleanAndRetFalse(PyObject *listp)
{
int i;
PyObject *item;
assert(PyList_Check(listp));
for (i = 0; i < PyList_GET_SIZE(listp); ++i) {
item = PyList_GET_ITEM(listp, i);
Py_DECREF(item);
}
Py_DECREF(listp);
return false;
}
bool
clientConnect(rpcClient *cp)
{
int fd;
rpcSource *sp;
struct sockaddr_in addr;
struct hostent *hp;
#ifdef MSWINDOWS
ulong flag = 1;
#endif /* MSWINDOWS */
#ifdef MSWINDOWS
fd = socket(PF_INET, SOCK_STREAM, 0);
if ((fd == INVALID_SOCKET)
or (ioctlsocket((SOCKET)fd, FIONBIO, &flag) != 0)) {
(rpcClient *)PyErr_SetFromErrno(rpcError);
return false;
}
#else
fd = socket(AF_INET, SOCK_STREAM, 0);
unless ((fd >= 0)
and (fcntl(fd, F_SETFL, O_NONBLOCK) == 0)) {
PyErr_SetFromErrno(rpcError);
return false;
}
#endif /* MSWINDOWS */
cp->src->fd = fd;
sp = cp->src;
hp = gethostbyname(cp->host);
if (hp == NULL) {
PyErr_SetFromErrno(rpcError);
return false;
}
addr.sin_family = hp->h_addrtype;
memcpy(&addr.sin_addr, hp->h_addr, hp->h_length);
addr.sin_port = htons((ushort) cp->port);
unless ((connect(sp->fd, (struct sockaddr *)&addr, sizeof(addr)) == 0)
or (isBlocked(get_errno()))) {
PyErr_SetFromErrno(rpcError);
return false;
}
return true;
}
static bool
connecting(rpcClient *cp)
{
int fd,
val,
len;
fd = cp->src->fd;
len = sizeof(val);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *)&val, &len) < 0) {
PyErr_SetFromErrno(rpcError);
return false;
}
if (val == 0) {
rpcLogSrc(1, cp->src, "client connection succeeded");
return RETURN_DONE;
} else if (isBlocked(val))
return RETURN_AGAIN;
/* val != 0 and not blocked: an error occured */
set_errno(val);
PyErr_SetFromErrno(rpcError);
return RETURN_ERR;
}
bool
rpcClientNbExecute(
rpcClient *cp,
char *method,
PyObject *params,
bool (*func) (rpcClient *, PyObject *, PyObject *),
PyObject *funcArgs,
char *name,
char *pass
)
{
PyObject *pyHost,
*req,
*addInfo,
*strReq;
rpcSource *sp;
if (cp->execing) {
PyErr_SetString(rpcError, "client already executing");
return false;
}
sp = cp->src;
if (rpcLogLevel >= 5) {
strReq = PyObject_Str(params);
if (strReq == NULL)
return false;
rpcLogSrc(5, sp, "client queueing command ('%s', %s)",
method, PyString_AS_STRING(strReq));
Py_DECREF(strReq);
} else if (rpcLogLevel >= 3)
rpcLogSrc(3, sp, "client queueing command '%s'", method);
addInfo = PyDict_New();
if (addInfo == NULL)
return false;
unless (addAuthentication(addInfo, name, pass))
return false;
pyHost = PyString_FromString(cp->src->desc);
if ((pyHost == NULL)
or (PyDict_SetItemString(addInfo, "Host", pyHost)))
return false;
req = buildRequest(cp->url, method, params, addInfo);
Py_DECREF(pyHost);
Py_DECREF(addInfo);
if (req == NULL)
return false;
if (rpcLogLevel >= 9) {
strReq = PyObject_Repr(req);
if (strReq == NULL)
return false;
rpcLogSrc(9, sp, "client request is %s",
PyString_AS_STRING(strReq));
Py_DECREF(strReq);
}
if (sp->fd < 0)
sp->params = Py_BuildValue("(O,i,s#,O,O)", cp, STATE_CONNECT,
&func, sizeof(func), funcArgs, req);
else
sp->params = Py_BuildValue("(O,i,s#,O,O)", cp, STATE_WRITE,
&func, sizeof(func), funcArgs, req);
Py_DECREF(req);
if (sp->params == NULL)
return false;
sp->actImp = ACT_IMMEDIATE;
sp->func = execDispatch;
unless (rpcDispAddSource(cp->disp, sp))
return false;
cp->execing = true;
return true;
}
static bool
addAuthentication(PyObject *addInfo, char *name, char *pass)
{
char *decPair,
*encPair;
PyObject *pyEncPair,
*pyDecPair;
PyObject *pyBasic;
if (name == NULL and pass == NULL)
return true;
decPair = NULL; /* to appease the compiler */
if (name != NULL and pass != NULL) {
decPair = alloc(strlen(name) + 1 + strlen(pass) + 1);
if (decPair == NULL)
return false;
sprintf(decPair, "%s:%s", name, pass);
} else if (name != NULL and pass == NULL) {
decPair = alloc(strlen(name) + 1 + 1);
if (decPair == NULL)
return false;
sprintf(decPair, "%s:", name);
} else if (name == NULL and pass != NULL) {
decPair = alloc(1 + strlen(pass) + 1);
if (decPair == NULL)
return false;
sprintf(decPair, ":%s", pass);
}
pyDecPair = PyString_FromString(decPair);
if (pyDecPair == NULL)
return false;
free(decPair);
encPair = rpcBase64Encode(pyDecPair);
if (encPair == NULL)
return false;
Py_DECREF(pyDecPair);
pyBasic = PyString_FromString("Basic ");
if (pyBasic == NULL)
return false;
pyEncPair = PyString_FromString(encPair);
free(encPair);
if (pyEncPair == NULL)
return false;
PyString_ConcatAndDel(&pyBasic, pyEncPair);
if (PyDict_SetItemString(addInfo, "Authorization", pyBasic))
return false;
Py_DECREF(pyBasic);
return true;
}
PyObject *
rpcClientExecute(
rpcClient *cp,
char *method,
PyObject *params,
double timeout,
char *name,
char *pass
)
{
bool timedOut;
rpcDisp *tmp;
PyObject *result,
*response,
*tuple;
tmp = cp->disp;
cp->disp = rpcDispNew();
if (cp->disp == NULL) {
cp->disp = tmp;
return NULL;
}
unless ((rpcClientNbExecute(cp, method, params, executed,
Py_None, name, pass))
and (rpcDispWork(cp->disp, timeout, &timedOut))) {
Py_DECREF(cp->disp);
cp->disp = tmp;
cp->execing = false;
return NULL;
}
Py_DECREF((PyObject *)cp->disp);
cp->disp = tmp;
if (timedOut) {
cp->execing = false;
set_errno(ETIMEDOUT);
PyErr_SetFromErrno(rpcError);
return NULL;
}
response = cp->src->params;
cp->src->params = NULL;
tuple = parseResponse(response);
Py_DECREF(response);
if (tuple == NULL)
return NULL;
assert(PyTuple_Check(tuple));
assert(PyTuple_GET_SIZE(tuple) == 2);
result = PyTuple_GET_ITEM(tuple, 0);
Py_INCREF(result);
Py_DECREF(tuple);
return result;
}
static bool
executed(rpcClient *cp, PyObject *resp, PyObject *args)
{
cp->src->params = resp;
Py_INCREF(resp);
return true;
}
/*
* write some of a request
*/
static int
writeRequest(rpcClient *cp, PyObject **toWritep)
{
PyObject *toWrite;
int nb,
slen;
toWrite = *toWritep;
slen = PyString_GET_SIZE(toWrite);
nb = write(cp->src->fd, PyString_AS_STRING(toWrite), slen);
rpcLogSrc(7, cp->src, "client wrote %d of %d bytes", nb, slen);
if (nb < 0 and isBlocked(get_errno()))
nb = 0;
if (nb < 0) {
PyErr_SetFromErrno(rpcError);
return RETURN_ERR;
} else if (nb == slen) {
rpcLogSrc(7, cp->src, "client finished writing request");
return RETURN_DONE;
} else {
assert(slen > nb);
toWrite = PyString_FromStringAndSize(
PyString_AS_STRING(toWrite) + nb, slen - nb);
if (toWrite == NULL)
return RETURN_ERR;
*toWritep = toWrite;
return RETURN_AGAIN;
}
}
/*
* Parse a header string to extract relevant information from it.
*
* The unparsed header comes in through **headp. The header is
* parsed, and separated into the body (*body), while several imporant
* data points are extracted from it. blen is the length of the body
* if Content-length is specified, otherwise it is -1. Chunked is
* whether or not the encoding is chunked.
*
* Note: if this function returns RETURN_AGAIN, the caller takes ownership
* of the headp pointer ONLY and must DECREF it. If RETURN_DONE is
* returned, the caller takes ownership of BOTH the header AND body and
* must DECREF both of them. If RETURN_ERR is returned, no ownership is
* transferred.
*/
static bool
readHeader(
rpcClient *client,
PyObject **headp,
PyObject **bodyp,
long *blen,
bool *chunked
)
{
PyObject *buff;
bool eof;
char *hp, /* start of header */
*bp, /* start of body */
*cp, /* current position */
*ep, /* end of string read */
*lp, /* start of content-length value */
*te; /* start of transfer-encoding */
*chunked = false;
buff = *headp;
unless (nbRead(client->src->fd, &buff, &eof))
return RETURN_ERR;
bp = NULL;
lp = NULL;
te = NULL;
hp = PyString_AS_STRING(buff);
ep = hp + PyString_GET_SIZE(buff);
rpcLogSrc(9, client->src, "client read %d bytes of header and body",
(ep - hp));
for (cp = hp; (bp == NULL) and (cp < ep); ++cp) {
if ((ep - cp > 16)
and (strncasecmp(cp, "Content-length: ", 16) == 0))
lp = cp + 16;
if ((ep - cp > 19)
and (strncasecmp(cp, "Transfer-Encoding: ", 19) == 0))
te = cp + 19;
if ((ep - cp > 4)
and (strncmp(cp, "\r\n\r\n", 4) == 0))
bp = cp + 4;
if ((ep - cp > 2)
and (strncmp(cp, "\n\n", 2) == 0))
bp = cp + 2;
}
if (bp == NULL) {
if (eof) {
Py_DECREF(buff);
PyErr_SetString(rpcError, "got EOS while reading");
return false;
}
*headp = buff;
return RETURN_AGAIN;
}
if ((te != NULL)
and (strncasecmp(te, "chunked\r\n", 9) == 0)) {
*chunked = true;
*blen = -1;
} else if (lp == NULL) {
fprintf(rpcLogger, "No Content-length parameter found\n");
fprintf(rpcLogger, "reading to EOF...\n");
*blen = -1;
} else unless (decodeActLong(&lp, ep, blen)) {
Py_DECREF(buff);
PyErr_SetString(rpcError, "invalid Content-length");
return RETURN_ERR;
}
rpcLogSrc(9, client->src, "client finished reading header");
rpcLogSrc(9, client->src,
"client bodylen should be %ld %s chunked mode",
*blen, *chunked ? "in" : "not in");
*headp = PyString_FromStringAndSize(hp, bp-hp);
*bodyp = PyString_FromStringAndSize(bp, ep-bp);
if (*headp == NULL || *bodyp == NULL)
return RETURN_ERR;
Py_DECREF(buff);
return (RETURN_DONE);
}
/*
* IMPORTANT NOTE:
* If this method returns anything but RETURN_ERR, it is the responsibility
* of the caller takes ownership of the new body value and must DECREF it
* appropriately.
*/
static bool
readResponse(rpcClient *cp, PyObject **bodyp, long blen)
{
PyObject *body;
bool eof;
long slen;
body = *bodyp;
unless (nbRead(cp->src->fd, &body, &eof))
return false;
slen = PyString_GET_SIZE(body);
rpcLogSrc(9, cp->src, "client read %ld of %d bytes of lbody",
slen, blen);
if (blen < 0) { /* we need to read to EOF */
*bodyp = body;
if (eof)
return RETURN_DONE;
else
return RETURN_AGAIN;
}
if (slen >= blen) {
*bodyp = body;
return RETURN_DONE;
} else if (eof) {
Py_DECREF(body);
PyErr_SetString(rpcError, "unexpected EOF while reading");
return RETURN_ERR;
} else {
*bodyp = body;
return RETURN_AGAIN;
}
}
/*
*
* IMPORTANT NOTE:
* If this method returns anything but RETURN_ERR, it is the responsibility
* of the caller takes ownership of the new body and chunk values and must
* DECREF them appropriately.
*
* We go through a bit of a song and dance to make sure that the reference
* counting works out just right. Note that XDECREF is used because we want
* to check if the value is NULL first.
*/
static bool
readChunks(rpcClient *client, PyObject **bodyp, PyObject **chunkp)
{
PyObject *obody,
*ochunk;
bool eof;
int r;
unless (nbRead(client->src->fd, chunkp, &eof))
return RETURN_ERR;
obody = NULL;
ochunk = *chunkp;
while (true) {
r = processChunk(client, bodyp, chunkp);
Py_XDECREF(obody);
Py_XDECREF(ochunk);
if (r != RETURN_AGAIN)
break;
obody = *bodyp;
ochunk = *chunkp;
}
if (r == RETURN_READ_MORE) {
if (eof) {
Py_XDECREF(obody);
Py_XDECREF(ochunk);
PyErr_SetString(rpcError, "unexpected EOF while reading");
return RETURN_ERR;
}
return RETURN_AGAIN;
}
return r;
}
/*
* Read a single "chunked" response.
*
* A chunk consists of a hexidecimal length followed by that much data
* followed by a CR LF i.e.:
*
* "18
* abcdefghijklmnopqrstuvwxyz
* "
* (where the LF's are really CR-LF's)
*
* The last chunk is marked by a length of 0, and this can be followed by
* a "trailer" which ends with a single emtpy CR-LF line
*
* This method returns the usual callback values, but may also return
* the value RETURN_READ_MORE which means we should return to the select()
* loop and read more information.
*
* IMPORTANT NOTE:
* If this method returns anything but RETURN_ERR, it is the responsibility
* of the caller takes ownership of the new body and chunk values and must
* DECREF them appropriately.
*/
static int
processChunk(rpcClient *client, PyObject **bodyp, PyObject **chunkp)
{
long clen; /* chunk length */
char *sp, /* pointer to the start of the chunk */
*bp, /* pointer to the body of the chunk */
*ep; /* pointer to the end of the chunk */
PyObject *nbody, /* new body */
*nchunk, /* new chunk */
*append; /* string to append */
bp = PyString_AS_STRING(*chunkp);
sp = bp;
ep = bp + PyString_GET_SIZE(*chunkp);
rpcLogSrc(9, client->src, "client processing chunk %s", sp);
while (true) {
if (bp+1 >= ep) {
Py_INCREF(*bodyp);
Py_INCREF(*chunkp);
return RETURN_READ_MORE;
}
if (strncmp(bp, "\r\n", 2) == 0) {
bp += 2;
break;
}
bp++;
}
unless (decodeActLongHex(&sp, bp, &clen)) {
PyErr_SetString(rpcError, "invalid size in chunk");
return RETURN_ERR;
}
rpcLogSrc(7, client->src, "chunk length is %ld", clen);
if ((ep-bp) < (clen+strlen("\r\n"))) {
Py_INCREF(*bodyp);
Py_INCREF(*chunkp);
return RETURN_READ_MORE;
}
if (clen == 0) {
rpcLogSrc(7, client->src, "client reading footer", clen);
while (sp < ep) {
if (ep-sp >= 4 and strncmp(sp, "\r\n\r\n", 4) == 0) {
Py_INCREF(*bodyp);
Py_INCREF(*chunkp);
return RETURN_DONE;
}
sp++;
}
Py_INCREF(*bodyp);
Py_INCREF(*chunkp);
return RETURN_READ_MORE;
}
unless (strncmp(bp+clen, "\r\n", 2) == 0) {
PyErr_SetString(rpcError, "chunk did not end in CR LF");
return RETURN_ERR;
}
rpcLogSrc(7, client->src, "client finished reading chunk", clen);
append = PyString_FromStringAndSize(bp, clen);
if (append == NULL)
return RETURN_ERR;
nbody = *bodyp;
Py_INCREF(nbody);
PyString_Concat(&nbody, append);
Py_DECREF(append);
sp = bp + clen + strlen("\r\n"); /* new chunk start */
nchunk = PyString_FromStringAndSize(sp, ep-sp);
if (nchunk == NULL)
return RETURN_ERR;
*bodyp = nbody;
*chunkp = nchunk;
return RETURN_AGAIN;
}
/*
* returns number of bytes read,
*
* NOTE: that buffpp ownership is given to the caller, and it is it's
* responsiblity to DECREF it.
*/
static bool
nbRead(int fd, PyObject **buffpp, bool *eof)
{
PyObject *buffp;
ulong bytesAv,
olen,
slen;
char *cp;
int res;
*eof = false;
buffp = *buffpp;
assert(PyString_Check(buffp));
olen = PyString_GET_SIZE(buffp);
slen = olen;
bytesAv = slen + READ_SIZE;
cp = alloc(bytesAv);
if (cp == NULL)
return false;
memcpy(cp, PyString_AS_STRING(buffp), slen);
while (true) {
if (slen + READ_SIZE > bytesAv) {
bytesAv = max(bytesAv * 2, slen + READ_SIZE);
cp = ralloc(cp, bytesAv);
if (cp == NULL)
return false;
}
res = read(fd, cp + slen, READ_SIZE);
if (res > 0)
slen += res;
else if (res == 0) {
*eof = true;
break;
} else if (res < 0) {
if (isBlocked(get_errno()))
break;
else { /* bad error */
free(cp);
PyErr_SetFromErrno(rpcError);
return false;
}
}
}
buffp = PyString_FromStringAndSize(cp, slen);
if (buffp == NULL)
return false;
*buffpp = buffp;
free(cp);
return true;
}
/*
* Module procedure: execute a command on a rpc Server
*/
static PyObject *
pyRpcClientExecute(PyObject *self, PyObject *args)
{
char *method;
PyObject *params,
*res;
double timeout;
PyObject *pyName,
*pyPass;
char *name,
*pass;
unless (PyArg_ParseTuple(args, "sOdOO", &method, ¶ms,
&timeout, &pyName, &pyPass))
return NULL;
unless (PySequence_Check(params)) {
PyErr_SetString(rpcError, "execute params must be a sequence");
return NULL;
}
if (PyObject_Compare(pyName, Py_None) == 0)
name = NULL;
else if (PyString_Check(pyName))
name = PyString_AS_STRING(pyName);
else
return setPyErr("name must be a string or None");
if (PyObject_Compare(pyPass, Py_None) == 0)
pass = NULL;
else if (PyString_Check(pyPass))
pass = PyString_AS_STRING(pyPass);
else
return setPyErr("pass must be a string or None");
res = rpcClientExecute((rpcClient *)self, method, params, timeout, name, pass);
return res;
}
/*
* queue up a function for later execution
*/
static PyObject *
pyRpcNbClientExecute(PyObject *self, PyObject *args)
{
rpcClient *cp;
char *method;
PyObject *params,
*extArgs,
*pyfunc;
PyObject *pyName,
*pyPass;
char *name,
*pass;
bool res;
cp = (rpcClient *)self;
unless (PyArg_ParseTuple(args, "sOOOOO", &method, ¶ms,
&pyfunc, &extArgs, &pyName, &pyPass))
return NULL;
unless (PySequence_Check(params)) {
PyErr_SetString(rpcError, "execute params must be a sequence");
return NULL;
}
if (PyObject_Compare(pyName, Py_None) == 0)
name = NULL;
else if (PyString_Check(pyName))
name = PyString_AS_STRING(pyName);
else
return setPyErr("name must be a string or None");
if (PyObject_Compare(pyPass, Py_None) == 0)
pass = NULL;
else if (PyString_Check(pyPass))
pass = PyString_AS_STRING(pyPass);
else
return setPyErr("pass must be a string or None");
extArgs = Py_BuildValue("(O,O)", pyfunc, extArgs);
if (params == NULL)
return NULL;
res = rpcClientNbExecute(cp, method, params, pyClientCallback,
extArgs, name, pass);
Py_DECREF(extArgs);
unless (res)
return NULL;
Py_INCREF(Py_None);
return Py_None;
}
static bool
pyClientCallback(rpcClient *cp, PyObject *resp, PyObject *args)
{
PyObject *params,
*pyfunc,
*extArgs,
*res;
unless (PyArg_ParseTuple(args, "OO:pyClientCallback",
&pyfunc, &extArgs)) {
return false;
}
assert(PyCallable_Check(pyfunc));
params = Py_BuildValue("(O,O,O)", cp, resp, extArgs);
if (params == NULL)
return false;
res = PyObject_CallObject(pyfunc, params);
Py_DECREF(params);
unless (res)
return false;
return true;
}
/*
* Work on a socket for a while
*/
static PyObject *
pyRpcClientWork(PyObject *self, PyObject *args)
{
bool timedOut;
double timeout;
rpcClient *cp;
cp = (rpcClient *)self;
unless ((PyArg_ParseTuple(args, "d", &timeout))
and (rpcDispWork(cp->disp, timeout, &timedOut)))
return NULL;
Py_INCREF(Py_None);
return Py_None;
}
/*
* Set a handler for errors on the client
*/
static PyObject *
pyRpcClientSetOnErr(PyObject *self, PyObject *args)
{
PyObject *func;
rpcClient *cp;
cp = (rpcClient *)self;
unless (PyArg_ParseTuple(args, "O", &func))
return NULL;
unless (PyCallable_Check(func)) {
PyErr_SetString(rpcError, "error handler must be callable");
return NULL;
}
if (PyObject_Compare(func, Py_None))
rpcSourceSetOnErr(cp->src, ONERR_TYPE_PY, func);
else
rpcSourceSetOnErr(cp->src, ONERR_TYPE_DEF, NULL);
Py_INCREF(Py_None);
return Py_None;
}
/*
* Set a handler for errors on the client
*/
static PyObject *
pyRpcClientActiveFds(PyObject *self, PyObject *args)
{
rpcClient *cp;
cp = (rpcClient *)self;
unless (PyArg_ParseTuple(args, ""))
return NULL;
return rpcDispActiveFds(cp->disp);
}
/*
* Close the client fd
*/
static PyObject *
pyRpcClientClose(PyObject *self, PyObject *args)
{
rpcClient *cp;
cp = (rpcClient *)self;
unless (PyArg_ParseTuple(args, ""))
return NULL;
rpcClientClose(cp);
Py_INCREF(Py_None);
return Py_None;
}
/*
* member functions for client object
*/
static PyMethodDef pyRpcClientMethods[] = {
{ "activeFds", (PyCFunction)pyRpcClientActiveFds, 1, 0 },
{ "close", (PyCFunction)pyRpcClientClose, 1, 0 },
{ "execute", (PyCFunction)pyRpcClientExecute, 1, 0 },
{ "nbexecute", (PyCFunction)pyRpcNbClientExecute, 1, 0 },
{ "setOnErr", (PyCFunction)pyRpcClientSetOnErr, 1, 0 },
{ "work", (PyCFunction)pyRpcClientWork, 1, 0 },
{ NULL, NULL},
};
/*
* return an attribute for a client object
*/
static PyObject *
pyRpcClientGetAttr(rpcClient *cp, char *name)
{
return Py_FindMethod(pyRpcClientMethods, (PyObject *)cp, name);
}
/*
* map characterstics of a client object
*/
PyTypeObject rpcClientType = {
PyObject_HEAD_INIT(0)
0,
"rpcClient",
sizeof(rpcClient),
0,
(destructor)rpcClientDealloc, /* tp_dealloc */
0, /* tp_print */
(getattrfunc)pyRpcClientGetAttr, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
0, /* tp_xxx4 */
0, /* tp_doc */
};
syntax highlighted by Code2HTML, v. 0.9.1