/*
* Copyright (C) 2001, Shilad Sen, Sourcelight Technologies, Inc.
* See xmlrpc.h or the README for more copyright information.
*/
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include "xmlrpc.h"
#include "rpcInternal.h"
#ifdef MSWINDOWS
#include <winsock2.h>
#else
#define CLOSE_ON_EXEC FD_CLOEXEC
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#endif
#define READ_SIZE 4096
static bool serveAccept(
rpcDisp *dp,
rpcSource *sp,
int actions,
PyObject *params
);
static bool serverReadHeader(
rpcDisp *dp,
rpcSource *sp,
int actions,
PyObject *params
);
static bool readRequest(
rpcDisp *dp,
rpcSource *sp,
int actions,
PyObject *params
);
static bool writeResponse(
rpcDisp *dp,
rpcSource *sp,
int actions,
PyObject *params
);
static bool doResponse(
rpcServer *servp,
rpcSource *srcp,
PyObject *result,
bool keepAlive
);
static PyObject *dispatch(
rpcServer *servp,
rpcSource *srcp,
PyObject *request,
bool *keepAlive
);
static bool grabError(
int *faultCode,
char **faultString,
PyObject *exc,
PyObject *v,
PyObject *tb
);
static PyObject *pyRpcServerActiveFds(PyObject *self, PyObject *args);
static PyObject *pyRpcServerAddMethods(PyObject *self, PyObject *args);
static PyObject *pyRpcServerAddSource(PyObject *self, PyObject *args);
static PyObject *pyRpcServerBindAndListen(PyObject *self, PyObject *args);
static PyObject *pyRpcServerClose(PyObject *self, PyObject *args);
static PyObject *pyRpcServerDelSource(PyObject *self, PyObject *args);
static PyObject *pyRpcServerExit(PyObject *self, PyObject *args);
static PyObject *pyRpcServerGetAttr(rpcServer *sp, char *name);
static PyObject *pyRpcServerQueueResponse(PyObject *self, PyObject *args);
static PyObject *pyRpcServerQueueFault(PyObject *self, PyObject *args);
static PyObject *pyRpcServerSetAuth(PyObject *self, PyObject *args);
static PyObject *pyRpcServerSetFdAndListen(PyObject *self, PyObject *args);
static PyObject *pyRpcServerSetOnErr(PyObject *self, PyObject *args);
static PyObject *pyRpcServerWork(PyObject *self, PyObject *args);
static bool nbRead(int fd, PyObject **buffpp, bool *eof);
static bool authenticate(rpcServer *servp, PyObject *addInfo);
rpcServer *
rpcServerNew(void)
{
rpcServer *sp;
sp = PyObject_NEW(rpcServer, &rpcServerType);
if (sp == NULL)
return NULL;
sp->disp = rpcDispNew();
if (sp->disp == NULL)
return NULL;
sp->src = rpcSourceNew(-1);
if (sp->src == NULL)
return NULL;
sp->src->doClose = true;
sp->comtab = PyDict_New();
if (sp->comtab == NULL)
return NULL;
sp->authFunc = NULL;
return sp;
}
void
rpcServerClose(rpcServer *sp)
{
if (sp->src->fd >= 0)
close(sp->src->fd);
sp->src->fd = -1;
rpcDispClear(sp->disp);
}
void
rpcServerDealloc(rpcServer *sp)
{
Py_DECREF(sp->src);
rpcDispDealloc(sp->disp);
}
bool
rpcServerAddCMethod(
rpcServer *servp,
char *method,
PyObject *(*cfunc)(
rpcServer *,
rpcSource *,
char *,
char *,
PyObject *,
PyObject *
)
)
{
PyObject *pyfunc;
pyfunc = PyString_FromStringAndSize((char*)&cfunc, sizeof(cfunc));
if (pyfunc == NULL)
return false;
if (PyDict_SetItemString(servp->comtab, method, pyfunc))
return false;
return true;
}
bool
rpcServerAddPyMethods(rpcServer *sp, PyObject *toAdd)
{
PyObject *items,
*elem,
*method,
*func;
int i;
unless (PyDict_Check(toAdd)) {
PyErr_SetString(rpcError, "addMethods requires dictionary");
return false;
}
items = PyDict_Items(toAdd);
if (items == NULL)
return false;
for (i = 0; i < PyList_GET_SIZE(items); ++i) {
elem = PyList_GET_ITEM(items, i);
assert(PyTuple_Check(elem));
assert(PyTuple_GET_SIZE(elem) == 2);
method = PyTuple_GET_ITEM(elem, 0);
func = PyTuple_GET_ITEM(elem, 1);
unless (PyString_Check(method)) {
PyErr_SetString(rpcError, "method names must be strings");
return false;
}
unless (PyCallable_Check(func)) {
PyErr_SetString(rpcError, "method must be callable");
return false;
}
if (PyDict_SetItem(sp->comtab, method, func))
return false;
}
return true;
}
bool
rpcServerBindAndListen(rpcServer *servp, int port, int queue)
{
int fd,
sflag;
struct sockaddr_in saddr;
#ifdef MSWINDOWS
ulong flag = 1;
#endif /* MSWINDOWS */
#ifdef MSWINDOWS
fd = socket(PF_INET, SOCK_STREAM, 0);
if ((fd == INVALID_SOCKET)
or (ioctlsocket((SOCKET)fd, FIONBIO, &flag) == SOCKET_ERROR)) {
PyErr_SetFromErrno(rpcError);
return false;
}
#else
fd = socket(AF_INET, SOCK_STREAM, 0);
if ((fd < 0)
or (fcntl(fd, F_SETFL, O_NONBLOCK) != 0)
or (fcntl(fd, F_SETFD, CLOSE_ON_EXEC) != 0)) {
PyErr_SetFromErrno(rpcError);
return false;
}
#endif /* MSWINDOWS */
servp->src->fd = fd;
sflag = 1;
fd = servp->src->fd;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR,
(void *)&sflag, sizeof(sflag)) != 0) {
rpcServerClose(servp);
PyErr_SetFromErrno(rpcError);
return false;
}
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons((ushort) port);
if ((bind(fd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0)
or (listen(fd, queue) < 0)) {
PyErr_SetFromErrno(rpcError);
rpcServerClose(servp);
return false;
}
rpcLogSrc(3, servp->src, "server listening on port %d", port);
servp->src->actImp = ACT_INPUT;
servp->src->func = serveAccept;
servp->src->params = (PyObject *)servp;
unless (rpcDispAddSource(servp->disp, servp->src)) {
rpcServerClose(servp);
return false;
}
return true;
}
bool
rpcServerSetFdAndListen(rpcServer *servp, int fd, int queue)
{
#ifdef MSWINDOWS
ulong flag = 1;
unless ((ioctlsocket((SOCKET)fd, FIONBIO, &flag) == 0)
and (listen(fd, queue) >= 0)) {
PyErr_SetFromErrno(rpcError);
return false;
}
#else
unless ((fcntl(fd, F_SETFL, O_NONBLOCK) == 0)
and (fcntl(fd, F_SETFD, CLOSE_ON_EXEC) == 0)
and (listen(fd, queue) >= 0)) {
PyErr_SetFromErrno(rpcError);
return false;
}
#endif /* MSWINDOWS */
rpcLogSrc(3, servp->src, "server listening on fd %d", fd);
/* Py_INCREF(servp); why was this here? */
servp->src->fd = fd;
servp->src->actImp = ACT_INPUT;
servp->src->func = serveAccept;
servp->src->params = (PyObject *)servp;
unless (rpcDispAddSource(servp->disp, servp->src))
return false;
return true;
}
void
rpcServerSetAuth(rpcServer *sp, PyObject *authFunc)
{
if (sp->authFunc) {
Py_DECREF(sp->authFunc);
}
sp->authFunc = authFunc;
if (authFunc)
Py_INCREF(authFunc);
}
static bool
serveAccept(rpcDisp *dp, rpcSource *sp, int actions, PyObject *servp)
{
int len,
res;
uint in;
rpcSource *client;
struct sockaddr_in addr;
#ifdef MSWINDOWS
ulong flag = 1;
#endif /* MSWINDOWS */
len = sizeof(addr);
res = accept(sp->fd, (struct sockaddr *)&addr, &len);
if (res >= 0) {
#ifdef MSWINDOWS
unless (ioctlsocket((SOCKET)res, FIONBIO, &flag) == 0) {
PyErr_SetFromErrno(rpcError);
return false;
}
#else
unless ((fcntl(res, F_SETFL, O_NONBLOCK) == 0)
and (fcntl(res, F_SETFD, CLOSE_ON_EXEC) == 0)) {
PyErr_SetFromErrno(rpcError);
return false;
}
#endif /* MSWINDOWS */
client = rpcSourceNew(res);
if (client == NULL)
return false;
client->doClose = true;
client->desc = alloc(strlen("255.255.255.255:123456") + 1);
if (client->desc == NULL)
return false;
in = ntohl(addr.sin_addr.s_addr);
sprintf(client->desc, "%u.%u.%u.%u:%u",
0xFF & (in>>24), 0xFF & (in>>16),
0xFF & (in>>8), 0xFF & in, ntohs(addr.sin_port));
rpcLogSrc(3, sp, "server got connection from %s", client->desc);
client->actImp = ACT_INPUT;
client->func = serverReadHeader;
client->params = Py_BuildValue("(s,O)", "", servp);
if (client->params == NULL)
return false;
rpcSourceSetOnErr(client, sp->onErrType, sp->onErr);
unless (rpcDispAddSource(dp, client))
return false;
Py_DECREF(client);
} else unless (isBlocked(get_errno())) {
PyErr_SetFromErrno(rpcError);
return false;
} else
fprintf(rpcLogger, "blocked on accept\n");
sp->actImp = ACT_INPUT;
sp->func = serveAccept;
sp->params = servp;
Py_INCREF(sp->params);
unless (rpcDispAddSource(dp, sp))
return false;
return true;
}
static bool
serverReadHeader(rpcDisp *dp, rpcSource *sp, int actions, PyObject *params)
{
PyObject *buff,
*args,
*servp;
bool eof,
res;
long blen;
char *hp, /* start of header */
*bp, /* start of body */
*cp, /* current position */
*ep, /* end of string read */
*lp; /* start of content-length value */
unless (PyArg_ParseTuple(params, "SO:serverReadHeader", &buff, &servp))
return false;
eof = false;
unless (nbRead(sp->fd, &buff, &eof))
return false;
bp = NULL;
lp = NULL;
hp = PyString_AS_STRING(buff);
ep = hp + PyString_GET_SIZE(buff);
rpcLogSrc(7, sp, "server read %d bytes of header",
PyString_GET_SIZE(buff));
for (cp = hp; (bp == NULL) and (cp < ep); ++cp) {
if ((ep - cp > 16)
and (strncasecmp(cp, "Content-length: ", 16) == 0))
lp = cp + 16;
if ((ep - cp > 4)
and (strncmp(cp, "\r\n\r\n", 4) == 0))
bp = cp + 4;
if ((ep - cp > 2)
and (strncmp(cp, "\n\n", 2) == 0))
bp = cp + 2;
}
if (bp == NULL) {
if (eof) {
if (PyString_GET_SIZE(buff) == 0) {
close(sp->fd);
sp->fd = -1;
Py_DECREF(buff);
rpcLogSrc(3, sp, "received EOF");
return true;
}
Py_DECREF(buff);
PyErr_SetString(rpcError, "got EOS while reading");
return false;
}
sp->actImp = ACT_INPUT;
sp->func = serverReadHeader;
sp->params = Py_BuildValue("(O,O)", buff, servp);
Py_DECREF(buff);
if ((sp->params == NULL)
or (not rpcDispAddSource(dp, sp)))
return false;
return true;
}
if (lp == NULL) {
Py_DECREF(buff);
PyErr_SetString(rpcError,
"no Content-length parameter found in header");
return false;
}
unless (decodeActLong(&lp, ep, &blen)) {
Py_DECREF(buff);
PyErr_SetString(rpcError, "invalid Content-length");
return false;
}
rpcLogSrc(7, sp, "server finished reading header");
rpcLogSrc(9, sp, "server content length should be %d", blen);
args = Py_BuildValue("(s#,s#,l,O)", hp, bp-hp, bp, ep-bp, blen, servp);
if (args == NULL)
return false;
res = readRequest(dp, sp, actions, args);
Py_DECREF(args);
Py_DECREF(buff);
return res;
}
static bool
readRequest(rpcDisp *dp, rpcSource *srcp, int actions, PyObject *params)
{
PyObject *head,
*body,
*servp,
*result;
bool eof,
res,
keepAlive;
long blen,
slen;
unless (PyArg_ParseTuple(params, "SSlO:readRequest",
&head, &body, &blen, &servp))
return false;
unless (nbRead(srcp->fd, &body, &eof))
return false;
slen = PyString_GET_SIZE(body);
rpcLogSrc(9, srcp, "server read %d of %d body bytes", slen, blen);
if (slen > blen) {
Py_DECREF(body);
PyErr_SetString(rpcError, "readRequest read too many bytes");
return false;
} else if (blen == slen) {
rpcLogSrc(9, srcp, "server finished reading body");
Py_INCREF(head); /* hack so concat doesn't fail */
PyString_ConcatAndDel(&head, body);
if (head == NULL)
return false;
result = dispatch((rpcServer *)servp, srcp, head, &keepAlive);
res = doResponse((rpcServer *)servp, srcp, result, keepAlive);
Py_DECREF(head);
return res;
} else {
if (eof) {
Py_DECREF(body);
PyErr_SetString(rpcError, "got EOS while reading body");
return false;
}
srcp->actImp = ACT_INPUT;
srcp->func = readRequest;
srcp->params = Py_BuildValue("(S,S,l,O)",
head, body, blen, servp);
Py_DECREF(body);
if (srcp->params == NULL)
return false;
unless (rpcDispAddSource(dp, srcp))
return false;
return true;
}
}
static bool
doResponse(rpcServer *servp, rpcSource *srcp, PyObject *result, bool keepAlive)
{
PyObject *addInfo,
*response,
*strRes,
*params,
*exc,
*v,
*tb;
int faultCode;
char *faultString;
bool res;
addInfo = PyDict_New();
if (addInfo == NULL)
return false;
response = NULL;
if (result == NULL) {
PyErr_Fetch(&exc, &v, &tb);
PyErr_NormalizeException(&exc, &v, &tb);
if (exc == NULL) {
return (false);
} else if (PyErr_GivenExceptionMatches(v, rpcPostpone)) {
rpcLogSrc(7, srcp, "received postpone request");
PyErr_Restore(exc, v, tb);
PyErr_Clear();
Py_DECREF(addInfo);
return (true);
} else if (exc and grabError(&faultCode, &faultString, exc, v, tb)) {
response = buildFault(faultCode, faultString, addInfo);
free(faultString);
} else {
response = buildFault(-1, "Unknown error", addInfo);
}
PyErr_Restore(exc, v, tb);
assert(PyErr_Occurred());
PyErr_Print();
PyErr_Clear();
} else {
response = buildResponse(result, addInfo);
Py_DECREF(result);
}
/* one more attempt at failure recovery */
if (response == NULL)
response = buildFault(-1, "Unknown error", addInfo);
Py_DECREF(addInfo);
if (response == NULL)
return false;
if (rpcLogLevel >= 8) {
strRes = PyObject_Repr(response);
if (strRes == NULL)
return false;
rpcLogSrc(8, srcp, "server responding with %s",
PyString_AS_STRING(strRes));
Py_DECREF(strRes);
}
params = Py_BuildValue("(O,i,O)", response, (int)keepAlive, servp);
Py_DECREF(response);
if (params == NULL)
return false;
res = writeResponse(servp->disp, srcp, ACT_OUTPUT, params);
Py_DECREF(params);
return res;
}
static PyObject *
dispatch(rpcServer *servp, rpcSource *srcp, PyObject *request, bool *keepAlive)
{
PyObject *args,
*method,
*params,
*addInfo,
*tuple,
*pyfunc,
*(*cfunc)(
rpcServer *,
rpcSource *,
char *,
char *,
PyObject *
),
*pyuri,
*result,
*strReq,
*strRes;
char buff[256],
*uri;
if (rpcLogLevel >= 8) {
strReq = PyObject_Repr(request);
if (strReq == NULL)
return NULL;
rpcLogSrc(8, srcp, "server got request %s",
PyString_AS_STRING(strReq));
Py_DECREF(strReq);
}
tuple = parseRequest(request);
if (tuple == NULL)
return NULL;
assert(PyTuple_Check(tuple));
assert(PyTuple_GET_SIZE(tuple) == 3);
method = PyTuple_GET_ITEM(tuple, 0);
params = PyTuple_GET_ITEM(tuple, 1);
addInfo = PyTuple_GET_ITEM(tuple, 2);
assert(PyDict_Check(addInfo));
unless (authenticate(servp, addInfo)) {
Py_DECREF(tuple);
return NULL;
}
*keepAlive = doKeepAliveFromDict(addInfo);
pyuri = PyDict_GetItemString(addInfo, "URI");
assert(pyuri != NULL);
assert(PyString_Check(pyuri));
uri = PyString_AS_STRING(pyuri);
if (rpcLogLevel >= 5) {
strReq = PyObject_Repr(params);
if (strReq == NULL)
return false;
rpcLogSrc(5, srcp, "server got request ('%s', %s)",
PyString_AS_STRING(method), PyString_AS_STRING(strReq));
Py_DECREF(strReq);
} else if (rpcLogLevel >= 3)
rpcLogSrc(3, srcp, "server got request '%s'",
PyString_AS_STRING(method));
assert(PyString_Check(method));
unless (PyMapping_HasKey(servp->comtab, method)) {
snprintf(buff, 255, "unknown command: \'%s\'",
PyString_AS_STRING(method));
Py_DECREF(tuple);
PyErr_SetString(rpcError, buff);
return NULL;
}
pyfunc = PyDict_GetItem(servp->comtab, method);
if (PyCallable_Check(pyfunc)) {
args = Py_BuildValue("(O,O,s,O,O)",
servp, srcp, uri, method, params);
Py_DECREF(tuple);
if (args == NULL)
return NULL;
result = PyObject_CallObject(pyfunc, args);
Py_DECREF(args);
} else if (PyString_Check(pyfunc)) {
assert(PyString_GET_SIZE(pyfunc) == sizeof(cfunc));
memcpy(&cfunc, PyString_AS_STRING(pyfunc), sizeof(cfunc));
result = cfunc(
servp,
srcp,
uri,
PyString_AS_STRING(method),
params
);
Py_DECREF(tuple);
} else {
setPyErr("illegal type for server callback");
return NULL;
}
if (result == NULL)
return NULL;
if (rpcLogLevel >= 5) {
strRes = PyObject_Str(result);
if (strRes == NULL)
return false;
rpcLogSrc(5, srcp, "server responding %s",
PyString_AS_STRING(strRes));
Py_DECREF(strRes);
}
return result;
}
static bool
grabError(
int *faultCode,
char **faultString,
PyObject *exc,
PyObject *v,
PyObject *tb
)
{
PyObject *err1,
*err2,
*delim;
if (PyErr_GivenExceptionMatches(v, rpcFault))
return rpcFault_Extract(v, faultCode, faultString);
err1 = PyObject_Str(exc);
err2 = PyObject_Str(v);
delim = PyString_FromString(": ");
if (err1 == NULL || err2 == NULL || delim == NULL)
return (false);
PyString_Concat(&err1, delim);
if (err1 == NULL)
return (false);
PyString_Concat(&err1, err2);
if (err1 == NULL)
return (false);
*faultString = alloc(PyString_GET_SIZE(err1) + 1);
if (*faultString == NULL)
return (false);
strcpy(*faultString, PyString_AS_STRING(err1));
*faultCode = -1;
Py_DECREF(delim);
Py_DECREF(err1);
Py_DECREF(err2);
return (true);
}
static bool
authenticate(rpcServer *servp, PyObject *addInfo)
{
PyObject *auth,
*encPair,
*decPair,
*name,
*pass,
*pyuri,
*res;
char *bp,
*cp,
*ep,
error[256];
if (servp->authFunc == NULL)
return true;
pyuri = PyDict_GetItemString(addInfo, "URI");
assert(pyuri != NULL);
assert(PyString_Check(pyuri));
auth = PyDict_GetItemString(addInfo, "Authorization");
if (auth == NULL) {
name = Py_None;
pass = Py_None;
Py_INCREF(Py_None);
Py_INCREF(Py_None);
} else {
assert(PyString_Check(auth));
if (strncasecmp("Basic ", PyString_AS_STRING(auth), 6)) {
setPyErr("unsupported authentication method");
return false;
}
encPair = PyString_FromString(PyString_AS_STRING(auth) + 6);
if (encPair == NULL)
return false;
decPair = rpcBase64Decode(encPair);
Py_DECREF(encPair);
if (decPair == NULL)
return false;
bp = PyString_AS_STRING(decPair);
ep = bp + PyString_GET_SIZE(decPair);
cp = strchr((const char *)bp, ':');
if (cp == NULL) {
setPyErr("illegal authentication string");
fprintf(rpcLogger, "illegal authentication is '%s'\n", bp);
return false;
}
name = PyString_FromStringAndSize(bp, cp-bp);
pass = PyString_FromStringAndSize(cp+1, ep-(cp+1));
if (name == NULL || pass == NULL)
return false;
Py_DECREF(decPair);
}
res = PyObject_CallFunction(servp->authFunc, "(O,O,O)",
pyuri, name, pass);
Py_DECREF(name);
Py_DECREF(pass);
if (res == NULL)
return false;
unless ((PyTuple_Check(res))
and (PyTuple_GET_SIZE(res) == 2)
and (PyInt_Check(PyTuple_GET_ITEM(res, 0)))
and (PyString_Check(PyTuple_GET_ITEM(res, 1)))) {
fprintf(rpcLogger, "authentication function returned ");
PyObject_Print(res, rpcLogger, 0);
Py_DECREF(res);
fprintf(rpcLogger, "; defaulting to (0, 'unknown')\n");
setPyErr("authentication failed for domain 'unknown'");
return false;
}
if (PyInt_AsLong(PyTuple_GET_ITEM(res, 0))) {
Py_DECREF(res);
return true;
}
memset(error, 0, sizeof(error));
snprintf(error, sizeof(error)-1, "authentication failed for domain '%s'",
PyString_AS_STRING(PyTuple_GET_ITEM(res, 1)));
setPyErr(error);
Py_DECREF(res);
return false;
}
static bool
writeResponse(rpcDisp *dp, rpcSource *srcp, int actions, PyObject *params)
{
rpcServer *servp;
PyObject *toWrite;
int keepAlive,
nb,
slen;
unless (PyArg_ParseTuple(params, "SiO:writeResponse",
&toWrite, &keepAlive, &servp))
return false;
slen = PyString_GET_SIZE(toWrite);
nb = write(srcp->fd, PyString_AS_STRING(toWrite), slen);
rpcLogSrc(9, srcp, "server wrote %d of %d bytes", nb, slen);
if (nb < 0 and isBlocked(get_errno()))
nb = 0;
if (nb < 0) {
PyErr_SetFromErrno(rpcError);
return false;
} else if (nb == slen) {
rpcLogSrc(9, srcp, "server finished writing response");
srcp->actImp = ACT_INPUT;
srcp->func = serverReadHeader;
srcp->params = Py_BuildValue("(s,O)", "", servp);
if (srcp->params == NULL)
return false;
if (keepAlive) {
unless (rpcDispAddSource(dp, srcp))
return false;
} else {
close(srcp->fd);
srcp->fd = -1;
}
return true;
} else {
assert(slen > nb);
toWrite = PyString_FromStringAndSize(
PyString_AS_STRING(toWrite) + nb, slen - nb);
if (toWrite == NULL)
return false;
srcp->actImp = ACT_OUTPUT;
srcp->func = writeResponse;
srcp->params = Py_BuildValue("(O,i,O)",
toWrite, keepAlive, servp);
Py_DECREF(toWrite);
if (srcp->params == NULL)
return false;
unless (rpcDispAddSource(dp, srcp))
return false;
return true;
}
}
/*
* Register some commands with an rpc server
*/
static PyObject *
pyRpcServerAddMethods(PyObject *self, PyObject *args)
{
rpcServer *sp;
PyObject *toAdd;
sp = (rpcServer *)self;
unless ((PyArg_ParseTuple(args, "O", &toAdd)
and (rpcServerAddPyMethods(sp, toAdd))))
return NULL;
Py_INCREF(Py_None);
return Py_None;
}
/*
* Bind an rpc server to a port, and start it listening
*/
static PyObject *
pyRpcServerBindAndListen(PyObject *self, PyObject *args)
{
rpcServer *sp;
int port,
queue;
sp = (rpcServer *)self;
unless ((PyArg_ParseTuple(args, "ii", &port, &queue)
and (rpcServerBindAndListen(sp, port, queue))))
return NULL;
Py_INCREF(Py_None);
return Py_None;
}
/*
* Bind an rpc server to a port, and start it listening
*/
static PyObject *
pyRpcServerSetFdAndListen(PyObject *self, PyObject *args)
{
rpcServer *sp;
int fd,
queue;
sp = (rpcServer *)self;
unless ((PyArg_ParseTuple(args, "ii", &fd, &queue)
and (rpcServerSetFdAndListen(sp, fd, queue))))
return NULL;
Py_INCREF(Py_None);
return Py_None;
}
/*
* Bind an rpc server to a port, and start it listening
*/
static PyObject *
pyRpcServerWork(PyObject *self, PyObject *args)
{
bool timedOut;
double timeout;
rpcServer *sp;
sp = (rpcServer *)self;
unless ((PyArg_ParseTuple(args, "d", &timeout))
and (rpcDispWork(sp->disp, timeout, &timedOut)))
return NULL;
Py_INCREF(Py_None);
return Py_None;
}
/*
* Bind an rpc server to a port, and start it listening
*/
static PyObject *
pyRpcServerActiveFds(PyObject *self, PyObject *args)
{
rpcServer *sp;
sp = (rpcServer *)self;
unless (PyArg_ParseTuple(args, ""))
return NULL;
return rpcDispActiveFds(sp->disp);
}
/*
* Bind an rpc server to a port, and start it listening
*/
static PyObject *
pyRpcServerClose(PyObject *self, PyObject *args)
{
rpcServer *sp;
sp = (rpcServer *)self;
unless (PyArg_ParseTuple(args, ""))
return NULL;
rpcServerClose(sp);
Py_INCREF(Py_None);
return Py_None;
}
/*
* Set a handler for errors on the client
*/
static PyObject *
pyRpcServerSetOnErr(PyObject *self, PyObject *args)
{
PyObject *func;
rpcServer *servp;
servp = (rpcServer *)self;
unless (PyArg_ParseTuple(args, "O", &func))
return NULL;
unless (PyCallable_Check(func)) {
PyErr_SetString(rpcError, "error handler must be callable");
return NULL;
}
if (PyObject_Compare(func, Py_None))
rpcSourceSetOnErr(servp->src, ONERR_TYPE_PY, func);
else
rpcSourceSetOnErr(servp->src, ONERR_TYPE_DEF, NULL);
Py_INCREF(Py_None);
return Py_None;
}
/*
* returns number of bytes read,
*/
static bool
nbRead(int fd, PyObject **buffpp, bool *eof)
{
PyObject *buffp;
long bytesAv,
olen,
slen;
char *cp;
int res;
*eof = false;
buffp = *buffpp;
assert(PyString_Check(buffp));
olen = PyString_GET_SIZE(buffp);
slen = olen;
bytesAv = slen + READ_SIZE;
cp = alloc(bytesAv);
if (cp == NULL)
return false;
memcpy(cp, PyString_AS_STRING(buffp), slen);
while (true) {
if (slen + READ_SIZE > bytesAv) {
bytesAv = max(bytesAv * 2, slen + READ_SIZE);
cp = ralloc(cp, bytesAv);
if (cp == NULL)
return false;
}
res = read(fd, cp + slen, READ_SIZE);
if (res > 0)
slen += res;
else if (res == 0) {
*eof = true;
break;
} else if (res < 0) {
if (isBlocked(get_errno()))
break;
else { /* bad error */
PyErr_SetFromErrno(rpcError);
return false;
}
}
}
buffp = PyString_FromStringAndSize(cp, slen);
if (buffp == NULL)
return false;
*buffpp = buffp;
free(cp);
return true;
}
/*
* Tell an rpc server to exit the "work routine" asap
*/
static PyObject *
pyRpcServerExit(PyObject *self, PyObject *args)
{
rpcServer *servp;
servp = (rpcServer *)self;
servp->disp->etime = 0.0;
Py_INCREF(Py_None);
return Py_None;
}
/*
* Tell an rpc server to exit the "work routine" asap
*/
static PyObject *
pyRpcServerSetAuth(PyObject *self, PyObject *args)
{
rpcServer *servp;
PyObject *authFunc;
servp = (rpcServer *)self;
unless (PyArg_ParseTuple(args, "O", &authFunc))
return NULL;
rpcServerSetAuth(servp, authFunc);
Py_INCREF(Py_None);
return Py_None;
}
/*
* Tell an rpc server to exit the "work routine" asap
*/
static PyObject *
pyRpcServerAddSource(PyObject *self, PyObject *args)
{
rpcServer *servp;
rpcSource *srcp;
servp = (rpcServer *)self;
unless (PyArg_ParseTuple(args, "O!", &rpcSourceType, &srcp))
return NULL;
if (srcp->func == NULL)
return setPyErr("callback function was NULL");
if (srcp->actImp == 0)
return setPyErr("no callback actions to observe");
if (srcp->params == NULL)
return setPyErr("callback params was NULL");
unless (PyTuple_Check(srcp->params))
return setPyErr("callback params was not a tuple");
unless (PyTuple_GET_SIZE(srcp->params) == 2)
return setPyErr("callback params was not a 2 length tuple");
unless (PyCallable_Check(PyTuple_GET_ITEM(srcp->params, 0)))
return setPyErr("callback params 1 was not callable");
unless (rpcDispAddSource(servp->disp, srcp))
return NULL;
Py_INCREF(Py_None);
return Py_None;
}
/*
* Tell an rpc server to exit the "work routine" asap
*/
static PyObject *
pyRpcServerDelSource(PyObject *self, PyObject *args)
{
rpcServer *servp;
rpcSource *srcp;
servp = (rpcServer *)self;
unless (PyArg_ParseTuple(args, "O!", &rpcSourceType, &srcp))
return NULL;
unless (rpcDispDelSource(servp->disp, srcp))
return NULL;
Py_INCREF(Py_None);
return Py_None;
}
static PyObject *
pyRpcServerQueueResponse(PyObject *self, PyObject *args)
{
rpcServer *servp;
rpcSource *srcp;
PyObject *result;
servp = (rpcServer *)self;
unless (PyArg_ParseTuple(args, "O!O", &rpcSourceType, &srcp, &result))
return (NULL);
if (doResponse(servp, srcp, result, true)) {
Py_INCREF(Py_None);
return (Py_None);
} else
return (NULL);
}
static PyObject *
pyRpcServerQueueFault(PyObject *self, PyObject *args)
{
rpcServer *servp;
rpcSource *srcp;
PyObject *faultCode,
*faultString;
servp = (rpcServer *)self;
unless (PyArg_ParseTuple(args, "O!OS", &rpcSourceType, &srcp,
&faultCode, &faultString))
return (NULL);
unless (PyInt_Check(faultCode)) {
PyErr_SetString(rpcError, "errorCode must be an integer");
return NULL;
}
rpcFaultRaise(faultCode, faultString);
Py_INCREF(Py_None);
return (Py_None);
}
/*
* member functions for server object
*/
static PyMethodDef pyRpcServerMethods[] = {
{ "activeFds", (PyCFunction)pyRpcServerActiveFds, 1, 0 },
{ "addMethods", (PyCFunction)pyRpcServerAddMethods, 1, 0 },
{ "bindAndListen", (PyCFunction)pyRpcServerBindAndListen, 1, 0 },
{ "close", (PyCFunction)pyRpcServerClose, 1, 0 },
{ "setFdAndListen", (PyCFunction)pyRpcServerSetFdAndListen, 1, 0 },
{ "work", (PyCFunction)pyRpcServerWork, 1, 0 },
{ "exit", (PyCFunction)pyRpcServerExit, 1, 0 },
{ "addSource", (PyCFunction)pyRpcServerAddSource, 1, 0 },
{ "delSource", (PyCFunction)pyRpcServerDelSource, 1, 0 },
{ "setAuth", (PyCFunction)pyRpcServerSetAuth, 1, 0 },
{ "setOnErr", (PyCFunction)pyRpcServerSetOnErr, 1, 0 },
{ "queueFault", (PyCFunction)pyRpcServerQueueFault, 1, 0 },
{ "queueResponse", (PyCFunction)pyRpcServerQueueResponse, 1, 0 },
{ NULL, NULL},
};
/*
* return an attribute for a server object
*/
static PyObject *
pyRpcServerGetAttr(rpcServer *sp, char *name)
{
return Py_FindMethod(pyRpcServerMethods, (PyObject *)sp, name);
}
/*
* map characterstics of an edb object
*/
PyTypeObject rpcServerType = {
PyObject_HEAD_INIT(0)
0,
"rpcServer",
sizeof(rpcServer),
0,
(destructor)rpcServerDealloc, /* tp_dealloc */
0, /* tp_print */
(getattrfunc)pyRpcServerGetAttr, /* tp_getattr */
0, /* tp_setattr */
0, /* tp_compare */
0, /* tp_repr */
0, /* tp_as_number */
0, /* tp_as_sequence */
0, /* tp_as_mapping */
0, /* tp_hash */
0, /* tp_call */
0, /* tp_str */
0, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
0, /* tp_xxx4 */
0, /* tp_doc */
};
syntax highlighted by Code2HTML, v. 0.9.1