/* * Copyright (C) 2001, Shilad Sen, Sourcelight Technologies, Inc. * See xmlrpc.h or the README for more copyright information. */ #include #include #include #include #include "xmlrpc.h" #include "rpcInternal.h" #ifdef MSWINDOWS #include #else #define CLOSE_ON_EXEC FD_CLOEXEC #include #include #include #endif #define READ_SIZE 4096 static bool serveAccept( rpcDisp *dp, rpcSource *sp, int actions, PyObject *params ); static bool serverReadHeader( rpcDisp *dp, rpcSource *sp, int actions, PyObject *params ); static bool readRequest( rpcDisp *dp, rpcSource *sp, int actions, PyObject *params ); static bool writeResponse( rpcDisp *dp, rpcSource *sp, int actions, PyObject *params ); static bool doResponse( rpcServer *servp, rpcSource *srcp, PyObject *result, bool keepAlive ); static PyObject *dispatch( rpcServer *servp, rpcSource *srcp, PyObject *request, bool *keepAlive ); static bool grabError( int *faultCode, char **faultString, PyObject *exc, PyObject *v, PyObject *tb ); static PyObject *pyRpcServerActiveFds(PyObject *self, PyObject *args); static PyObject *pyRpcServerAddMethods(PyObject *self, PyObject *args); static PyObject *pyRpcServerAddSource(PyObject *self, PyObject *args); static PyObject *pyRpcServerBindAndListen(PyObject *self, PyObject *args); static PyObject *pyRpcServerClose(PyObject *self, PyObject *args); static PyObject *pyRpcServerDelSource(PyObject *self, PyObject *args); static PyObject *pyRpcServerExit(PyObject *self, PyObject *args); static PyObject *pyRpcServerGetAttr(rpcServer *sp, char *name); static PyObject *pyRpcServerQueueResponse(PyObject *self, PyObject *args); static PyObject *pyRpcServerQueueFault(PyObject *self, PyObject *args); static PyObject *pyRpcServerSetAuth(PyObject *self, PyObject *args); static PyObject *pyRpcServerSetFdAndListen(PyObject *self, PyObject *args); static PyObject *pyRpcServerSetOnErr(PyObject *self, PyObject *args); static PyObject *pyRpcServerWork(PyObject *self, PyObject *args); static bool nbRead(int fd, PyObject **buffpp, bool *eof); static bool authenticate(rpcServer *servp, PyObject *addInfo); rpcServer * rpcServerNew(void) { rpcServer *sp; sp = PyObject_NEW(rpcServer, &rpcServerType); if (sp == NULL) return NULL; sp->disp = rpcDispNew(); if (sp->disp == NULL) return NULL; sp->src = rpcSourceNew(-1); if (sp->src == NULL) return NULL; sp->src->doClose = true; sp->comtab = PyDict_New(); if (sp->comtab == NULL) return NULL; sp->authFunc = NULL; return sp; } void rpcServerClose(rpcServer *sp) { if (sp->src->fd >= 0) close(sp->src->fd); sp->src->fd = -1; rpcDispClear(sp->disp); } void rpcServerDealloc(rpcServer *sp) { Py_DECREF(sp->src); rpcDispDealloc(sp->disp); } bool rpcServerAddCMethod( rpcServer *servp, char *method, PyObject *(*cfunc)( rpcServer *, rpcSource *, char *, char *, PyObject *, PyObject * ) ) { PyObject *pyfunc; pyfunc = PyString_FromStringAndSize((char*)&cfunc, sizeof(cfunc)); if (pyfunc == NULL) return false; if (PyDict_SetItemString(servp->comtab, method, pyfunc)) return false; return true; } bool rpcServerAddPyMethods(rpcServer *sp, PyObject *toAdd) { PyObject *items, *elem, *method, *func; int i; unless (PyDict_Check(toAdd)) { PyErr_SetString(rpcError, "addMethods requires dictionary"); return false; } items = PyDict_Items(toAdd); if (items == NULL) return false; for (i = 0; i < PyList_GET_SIZE(items); ++i) { elem = PyList_GET_ITEM(items, i); assert(PyTuple_Check(elem)); assert(PyTuple_GET_SIZE(elem) == 2); method = PyTuple_GET_ITEM(elem, 0); func = PyTuple_GET_ITEM(elem, 1); unless (PyString_Check(method)) { PyErr_SetString(rpcError, "method names must be strings"); return false; } unless (PyCallable_Check(func)) { PyErr_SetString(rpcError, "method must be callable"); return false; } if (PyDict_SetItem(sp->comtab, method, func)) return false; } return true; } bool rpcServerBindAndListen(rpcServer *servp, int port, int queue) { int fd, sflag; struct sockaddr_in saddr; #ifdef MSWINDOWS ulong flag = 1; #endif /* MSWINDOWS */ #ifdef MSWINDOWS fd = socket(PF_INET, SOCK_STREAM, 0); if ((fd == INVALID_SOCKET) or (ioctlsocket((SOCKET)fd, FIONBIO, &flag) == SOCKET_ERROR)) { PyErr_SetFromErrno(rpcError); return false; } #else fd = socket(AF_INET, SOCK_STREAM, 0); if ((fd < 0) or (fcntl(fd, F_SETFL, O_NONBLOCK) != 0) or (fcntl(fd, F_SETFD, CLOSE_ON_EXEC) != 0)) { PyErr_SetFromErrno(rpcError); return false; } #endif /* MSWINDOWS */ servp->src->fd = fd; sflag = 1; fd = servp->src->fd; if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sflag, sizeof(sflag)) != 0) { rpcServerClose(servp); PyErr_SetFromErrno(rpcError); return false; } saddr.sin_family = AF_INET; saddr.sin_addr.s_addr = htonl(INADDR_ANY); saddr.sin_port = htons((ushort) port); if ((bind(fd, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) or (listen(fd, queue) < 0)) { PyErr_SetFromErrno(rpcError); rpcServerClose(servp); return false; } rpcLogSrc(3, servp->src, "server listening on port %d", port); servp->src->actImp = ACT_INPUT; servp->src->func = serveAccept; servp->src->params = (PyObject *)servp; unless (rpcDispAddSource(servp->disp, servp->src)) { rpcServerClose(servp); return false; } return true; } bool rpcServerSetFdAndListen(rpcServer *servp, int fd, int queue) { #ifdef MSWINDOWS ulong flag = 1; unless ((ioctlsocket((SOCKET)fd, FIONBIO, &flag) == 0) and (listen(fd, queue) >= 0)) { PyErr_SetFromErrno(rpcError); return false; } #else unless ((fcntl(fd, F_SETFL, O_NONBLOCK) == 0) and (fcntl(fd, F_SETFD, CLOSE_ON_EXEC) == 0) and (listen(fd, queue) >= 0)) { PyErr_SetFromErrno(rpcError); return false; } #endif /* MSWINDOWS */ rpcLogSrc(3, servp->src, "server listening on fd %d", fd); /* Py_INCREF(servp); why was this here? */ servp->src->fd = fd; servp->src->actImp = ACT_INPUT; servp->src->func = serveAccept; servp->src->params = (PyObject *)servp; unless (rpcDispAddSource(servp->disp, servp->src)) return false; return true; } void rpcServerSetAuth(rpcServer *sp, PyObject *authFunc) { if (sp->authFunc) { Py_DECREF(sp->authFunc); } sp->authFunc = authFunc; if (authFunc) Py_INCREF(authFunc); } static bool serveAccept(rpcDisp *dp, rpcSource *sp, int actions, PyObject *servp) { int len, res; uint in; rpcSource *client; struct sockaddr_in addr; #ifdef MSWINDOWS ulong flag = 1; #endif /* MSWINDOWS */ len = sizeof(addr); res = accept(sp->fd, (struct sockaddr *)&addr, &len); if (res >= 0) { #ifdef MSWINDOWS unless (ioctlsocket((SOCKET)res, FIONBIO, &flag) == 0) { PyErr_SetFromErrno(rpcError); return false; } #else unless ((fcntl(res, F_SETFL, O_NONBLOCK) == 0) and (fcntl(res, F_SETFD, CLOSE_ON_EXEC) == 0)) { PyErr_SetFromErrno(rpcError); return false; } #endif /* MSWINDOWS */ client = rpcSourceNew(res); if (client == NULL) return false; client->doClose = true; client->desc = alloc(strlen("255.255.255.255:123456") + 1); if (client->desc == NULL) return false; in = ntohl(addr.sin_addr.s_addr); sprintf(client->desc, "%u.%u.%u.%u:%u", 0xFF & (in>>24), 0xFF & (in>>16), 0xFF & (in>>8), 0xFF & in, ntohs(addr.sin_port)); rpcLogSrc(3, sp, "server got connection from %s", client->desc); client->actImp = ACT_INPUT; client->func = serverReadHeader; client->params = Py_BuildValue("(s,O)", "", servp); if (client->params == NULL) return false; rpcSourceSetOnErr(client, sp->onErrType, sp->onErr); unless (rpcDispAddSource(dp, client)) return false; Py_DECREF(client); } else unless (isBlocked(get_errno())) { PyErr_SetFromErrno(rpcError); return false; } else fprintf(rpcLogger, "blocked on accept\n"); sp->actImp = ACT_INPUT; sp->func = serveAccept; sp->params = servp; Py_INCREF(sp->params); unless (rpcDispAddSource(dp, sp)) return false; return true; } static bool serverReadHeader(rpcDisp *dp, rpcSource *sp, int actions, PyObject *params) { PyObject *buff, *args, *servp; bool eof, res; long blen; char *hp, /* start of header */ *bp, /* start of body */ *cp, /* current position */ *ep, /* end of string read */ *lp; /* start of content-length value */ unless (PyArg_ParseTuple(params, "SO:serverReadHeader", &buff, &servp)) return false; eof = false; unless (nbRead(sp->fd, &buff, &eof)) return false; bp = NULL; lp = NULL; hp = PyString_AS_STRING(buff); ep = hp + PyString_GET_SIZE(buff); rpcLogSrc(7, sp, "server read %d bytes of header", PyString_GET_SIZE(buff)); for (cp = hp; (bp == NULL) and (cp < ep); ++cp) { if ((ep - cp > 16) and (strncasecmp(cp, "Content-length: ", 16) == 0)) lp = cp + 16; if ((ep - cp > 4) and (strncmp(cp, "\r\n\r\n", 4) == 0)) bp = cp + 4; if ((ep - cp > 2) and (strncmp(cp, "\n\n", 2) == 0)) bp = cp + 2; } if (bp == NULL) { if (eof) { if (PyString_GET_SIZE(buff) == 0) { close(sp->fd); sp->fd = -1; Py_DECREF(buff); rpcLogSrc(3, sp, "received EOF"); return true; } Py_DECREF(buff); PyErr_SetString(rpcError, "got EOS while reading"); return false; } sp->actImp = ACT_INPUT; sp->func = serverReadHeader; sp->params = Py_BuildValue("(O,O)", buff, servp); Py_DECREF(buff); if ((sp->params == NULL) or (not rpcDispAddSource(dp, sp))) return false; return true; } if (lp == NULL) { Py_DECREF(buff); PyErr_SetString(rpcError, "no Content-length parameter found in header"); return false; } unless (decodeActLong(&lp, ep, &blen)) { Py_DECREF(buff); PyErr_SetString(rpcError, "invalid Content-length"); return false; } rpcLogSrc(7, sp, "server finished reading header"); rpcLogSrc(9, sp, "server content length should be %d", blen); args = Py_BuildValue("(s#,s#,l,O)", hp, bp-hp, bp, ep-bp, blen, servp); if (args == NULL) return false; res = readRequest(dp, sp, actions, args); Py_DECREF(args); Py_DECREF(buff); return res; } static bool readRequest(rpcDisp *dp, rpcSource *srcp, int actions, PyObject *params) { PyObject *head, *body, *servp, *result; bool eof, res, keepAlive; long blen, slen; unless (PyArg_ParseTuple(params, "SSlO:readRequest", &head, &body, &blen, &servp)) return false; unless (nbRead(srcp->fd, &body, &eof)) return false; slen = PyString_GET_SIZE(body); rpcLogSrc(9, srcp, "server read %d of %d body bytes", slen, blen); if (slen > blen) { Py_DECREF(body); PyErr_SetString(rpcError, "readRequest read too many bytes"); return false; } else if (blen == slen) { rpcLogSrc(9, srcp, "server finished reading body"); Py_INCREF(head); /* hack so concat doesn't fail */ PyString_ConcatAndDel(&head, body); if (head == NULL) return false; result = dispatch((rpcServer *)servp, srcp, head, &keepAlive); res = doResponse((rpcServer *)servp, srcp, result, keepAlive); Py_DECREF(head); return res; } else { if (eof) { Py_DECREF(body); PyErr_SetString(rpcError, "got EOS while reading body"); return false; } srcp->actImp = ACT_INPUT; srcp->func = readRequest; srcp->params = Py_BuildValue("(S,S,l,O)", head, body, blen, servp); Py_DECREF(body); if (srcp->params == NULL) return false; unless (rpcDispAddSource(dp, srcp)) return false; return true; } } static bool doResponse(rpcServer *servp, rpcSource *srcp, PyObject *result, bool keepAlive) { PyObject *addInfo, *response, *strRes, *params, *exc, *v, *tb; int faultCode; char *faultString; bool res; addInfo = PyDict_New(); if (addInfo == NULL) return false; response = NULL; if (result == NULL) { PyErr_Fetch(&exc, &v, &tb); PyErr_NormalizeException(&exc, &v, &tb); if (exc == NULL) { return (false); } else if (PyErr_GivenExceptionMatches(v, rpcPostpone)) { rpcLogSrc(7, srcp, "received postpone request"); PyErr_Restore(exc, v, tb); PyErr_Clear(); Py_DECREF(addInfo); return (true); } else if (exc and grabError(&faultCode, &faultString, exc, v, tb)) { response = buildFault(faultCode, faultString, addInfo); free(faultString); } else { response = buildFault(-1, "Unknown error", addInfo); } PyErr_Restore(exc, v, tb); assert(PyErr_Occurred()); PyErr_Print(); PyErr_Clear(); } else { response = buildResponse(result, addInfo); Py_DECREF(result); } /* one more attempt at failure recovery */ if (response == NULL) response = buildFault(-1, "Unknown error", addInfo); Py_DECREF(addInfo); if (response == NULL) return false; if (rpcLogLevel >= 8) { strRes = PyObject_Repr(response); if (strRes == NULL) return false; rpcLogSrc(8, srcp, "server responding with %s", PyString_AS_STRING(strRes)); Py_DECREF(strRes); } params = Py_BuildValue("(O,i,O)", response, (int)keepAlive, servp); Py_DECREF(response); if (params == NULL) return false; res = writeResponse(servp->disp, srcp, ACT_OUTPUT, params); Py_DECREF(params); return res; } static PyObject * dispatch(rpcServer *servp, rpcSource *srcp, PyObject *request, bool *keepAlive) { PyObject *args, *method, *params, *addInfo, *tuple, *pyfunc, *(*cfunc)( rpcServer *, rpcSource *, char *, char *, PyObject * ), *pyuri, *result, *strReq, *strRes; char buff[256], *uri; if (rpcLogLevel >= 8) { strReq = PyObject_Repr(request); if (strReq == NULL) return NULL; rpcLogSrc(8, srcp, "server got request %s", PyString_AS_STRING(strReq)); Py_DECREF(strReq); } tuple = parseRequest(request); if (tuple == NULL) return NULL; assert(PyTuple_Check(tuple)); assert(PyTuple_GET_SIZE(tuple) == 3); method = PyTuple_GET_ITEM(tuple, 0); params = PyTuple_GET_ITEM(tuple, 1); addInfo = PyTuple_GET_ITEM(tuple, 2); assert(PyDict_Check(addInfo)); unless (authenticate(servp, addInfo)) { Py_DECREF(tuple); return NULL; } *keepAlive = doKeepAliveFromDict(addInfo); pyuri = PyDict_GetItemString(addInfo, "URI"); assert(pyuri != NULL); assert(PyString_Check(pyuri)); uri = PyString_AS_STRING(pyuri); if (rpcLogLevel >= 5) { strReq = PyObject_Repr(params); if (strReq == NULL) return false; rpcLogSrc(5, srcp, "server got request ('%s', %s)", PyString_AS_STRING(method), PyString_AS_STRING(strReq)); Py_DECREF(strReq); } else if (rpcLogLevel >= 3) rpcLogSrc(3, srcp, "server got request '%s'", PyString_AS_STRING(method)); assert(PyString_Check(method)); unless (PyMapping_HasKey(servp->comtab, method)) { snprintf(buff, 255, "unknown command: \'%s\'", PyString_AS_STRING(method)); Py_DECREF(tuple); PyErr_SetString(rpcError, buff); return NULL; } pyfunc = PyDict_GetItem(servp->comtab, method); if (PyCallable_Check(pyfunc)) { args = Py_BuildValue("(O,O,s,O,O)", servp, srcp, uri, method, params); Py_DECREF(tuple); if (args == NULL) return NULL; result = PyObject_CallObject(pyfunc, args); Py_DECREF(args); } else if (PyString_Check(pyfunc)) { assert(PyString_GET_SIZE(pyfunc) == sizeof(cfunc)); memcpy(&cfunc, PyString_AS_STRING(pyfunc), sizeof(cfunc)); result = cfunc( servp, srcp, uri, PyString_AS_STRING(method), params ); Py_DECREF(tuple); } else { setPyErr("illegal type for server callback"); return NULL; } if (result == NULL) return NULL; if (rpcLogLevel >= 5) { strRes = PyObject_Str(result); if (strRes == NULL) return false; rpcLogSrc(5, srcp, "server responding %s", PyString_AS_STRING(strRes)); Py_DECREF(strRes); } return result; } static bool grabError( int *faultCode, char **faultString, PyObject *exc, PyObject *v, PyObject *tb ) { PyObject *err1, *err2, *delim; if (PyErr_GivenExceptionMatches(v, rpcFault)) return rpcFault_Extract(v, faultCode, faultString); err1 = PyObject_Str(exc); err2 = PyObject_Str(v); delim = PyString_FromString(": "); if (err1 == NULL || err2 == NULL || delim == NULL) return (false); PyString_Concat(&err1, delim); if (err1 == NULL) return (false); PyString_Concat(&err1, err2); if (err1 == NULL) return (false); *faultString = alloc(PyString_GET_SIZE(err1) + 1); if (*faultString == NULL) return (false); strcpy(*faultString, PyString_AS_STRING(err1)); *faultCode = -1; Py_DECREF(delim); Py_DECREF(err1); Py_DECREF(err2); return (true); } static bool authenticate(rpcServer *servp, PyObject *addInfo) { PyObject *auth, *encPair, *decPair, *name, *pass, *pyuri, *res; char *bp, *cp, *ep, error[256]; if (servp->authFunc == NULL) return true; pyuri = PyDict_GetItemString(addInfo, "URI"); assert(pyuri != NULL); assert(PyString_Check(pyuri)); auth = PyDict_GetItemString(addInfo, "Authorization"); if (auth == NULL) { name = Py_None; pass = Py_None; Py_INCREF(Py_None); Py_INCREF(Py_None); } else { assert(PyString_Check(auth)); if (strncasecmp("Basic ", PyString_AS_STRING(auth), 6)) { setPyErr("unsupported authentication method"); return false; } encPair = PyString_FromString(PyString_AS_STRING(auth) + 6); if (encPair == NULL) return false; decPair = rpcBase64Decode(encPair); Py_DECREF(encPair); if (decPair == NULL) return false; bp = PyString_AS_STRING(decPair); ep = bp + PyString_GET_SIZE(decPair); cp = strchr((const char *)bp, ':'); if (cp == NULL) { setPyErr("illegal authentication string"); fprintf(rpcLogger, "illegal authentication is '%s'\n", bp); return false; } name = PyString_FromStringAndSize(bp, cp-bp); pass = PyString_FromStringAndSize(cp+1, ep-(cp+1)); if (name == NULL || pass == NULL) return false; Py_DECREF(decPair); } res = PyObject_CallFunction(servp->authFunc, "(O,O,O)", pyuri, name, pass); Py_DECREF(name); Py_DECREF(pass); if (res == NULL) return false; unless ((PyTuple_Check(res)) and (PyTuple_GET_SIZE(res) == 2) and (PyInt_Check(PyTuple_GET_ITEM(res, 0))) and (PyString_Check(PyTuple_GET_ITEM(res, 1)))) { fprintf(rpcLogger, "authentication function returned "); PyObject_Print(res, rpcLogger, 0); Py_DECREF(res); fprintf(rpcLogger, "; defaulting to (0, 'unknown')\n"); setPyErr("authentication failed for domain 'unknown'"); return false; } if (PyInt_AsLong(PyTuple_GET_ITEM(res, 0))) { Py_DECREF(res); return true; } memset(error, 0, sizeof(error)); snprintf(error, sizeof(error)-1, "authentication failed for domain '%s'", PyString_AS_STRING(PyTuple_GET_ITEM(res, 1))); setPyErr(error); Py_DECREF(res); return false; } static bool writeResponse(rpcDisp *dp, rpcSource *srcp, int actions, PyObject *params) { rpcServer *servp; PyObject *toWrite; int keepAlive, nb, slen; unless (PyArg_ParseTuple(params, "SiO:writeResponse", &toWrite, &keepAlive, &servp)) return false; slen = PyString_GET_SIZE(toWrite); nb = write(srcp->fd, PyString_AS_STRING(toWrite), slen); rpcLogSrc(9, srcp, "server wrote %d of %d bytes", nb, slen); if (nb < 0 and isBlocked(get_errno())) nb = 0; if (nb < 0) { PyErr_SetFromErrno(rpcError); return false; } else if (nb == slen) { rpcLogSrc(9, srcp, "server finished writing response"); srcp->actImp = ACT_INPUT; srcp->func = serverReadHeader; srcp->params = Py_BuildValue("(s,O)", "", servp); if (srcp->params == NULL) return false; if (keepAlive) { unless (rpcDispAddSource(dp, srcp)) return false; } else { close(srcp->fd); srcp->fd = -1; } return true; } else { assert(slen > nb); toWrite = PyString_FromStringAndSize( PyString_AS_STRING(toWrite) + nb, slen - nb); if (toWrite == NULL) return false; srcp->actImp = ACT_OUTPUT; srcp->func = writeResponse; srcp->params = Py_BuildValue("(O,i,O)", toWrite, keepAlive, servp); Py_DECREF(toWrite); if (srcp->params == NULL) return false; unless (rpcDispAddSource(dp, srcp)) return false; return true; } } /* * Register some commands with an rpc server */ static PyObject * pyRpcServerAddMethods(PyObject *self, PyObject *args) { rpcServer *sp; PyObject *toAdd; sp = (rpcServer *)self; unless ((PyArg_ParseTuple(args, "O", &toAdd) and (rpcServerAddPyMethods(sp, toAdd)))) return NULL; Py_INCREF(Py_None); return Py_None; } /* * Bind an rpc server to a port, and start it listening */ static PyObject * pyRpcServerBindAndListen(PyObject *self, PyObject *args) { rpcServer *sp; int port, queue; sp = (rpcServer *)self; unless ((PyArg_ParseTuple(args, "ii", &port, &queue) and (rpcServerBindAndListen(sp, port, queue)))) return NULL; Py_INCREF(Py_None); return Py_None; } /* * Bind an rpc server to a port, and start it listening */ static PyObject * pyRpcServerSetFdAndListen(PyObject *self, PyObject *args) { rpcServer *sp; int fd, queue; sp = (rpcServer *)self; unless ((PyArg_ParseTuple(args, "ii", &fd, &queue) and (rpcServerSetFdAndListen(sp, fd, queue)))) return NULL; Py_INCREF(Py_None); return Py_None; } /* * Bind an rpc server to a port, and start it listening */ static PyObject * pyRpcServerWork(PyObject *self, PyObject *args) { bool timedOut; double timeout; rpcServer *sp; sp = (rpcServer *)self; unless ((PyArg_ParseTuple(args, "d", &timeout)) and (rpcDispWork(sp->disp, timeout, &timedOut))) return NULL; Py_INCREF(Py_None); return Py_None; } /* * Bind an rpc server to a port, and start it listening */ static PyObject * pyRpcServerActiveFds(PyObject *self, PyObject *args) { rpcServer *sp; sp = (rpcServer *)self; unless (PyArg_ParseTuple(args, "")) return NULL; return rpcDispActiveFds(sp->disp); } /* * Bind an rpc server to a port, and start it listening */ static PyObject * pyRpcServerClose(PyObject *self, PyObject *args) { rpcServer *sp; sp = (rpcServer *)self; unless (PyArg_ParseTuple(args, "")) return NULL; rpcServerClose(sp); Py_INCREF(Py_None); return Py_None; } /* * Set a handler for errors on the client */ static PyObject * pyRpcServerSetOnErr(PyObject *self, PyObject *args) { PyObject *func; rpcServer *servp; servp = (rpcServer *)self; unless (PyArg_ParseTuple(args, "O", &func)) return NULL; unless (PyCallable_Check(func)) { PyErr_SetString(rpcError, "error handler must be callable"); return NULL; } if (PyObject_Compare(func, Py_None)) rpcSourceSetOnErr(servp->src, ONERR_TYPE_PY, func); else rpcSourceSetOnErr(servp->src, ONERR_TYPE_DEF, NULL); Py_INCREF(Py_None); return Py_None; } /* * returns number of bytes read, */ static bool nbRead(int fd, PyObject **buffpp, bool *eof) { PyObject *buffp; long bytesAv, olen, slen; char *cp; int res; *eof = false; buffp = *buffpp; assert(PyString_Check(buffp)); olen = PyString_GET_SIZE(buffp); slen = olen; bytesAv = slen + READ_SIZE; cp = alloc(bytesAv); if (cp == NULL) return false; memcpy(cp, PyString_AS_STRING(buffp), slen); while (true) { if (slen + READ_SIZE > bytesAv) { bytesAv = max(bytesAv * 2, slen + READ_SIZE); cp = ralloc(cp, bytesAv); if (cp == NULL) return false; } res = read(fd, cp + slen, READ_SIZE); if (res > 0) slen += res; else if (res == 0) { *eof = true; break; } else if (res < 0) { if (isBlocked(get_errno())) break; else { /* bad error */ PyErr_SetFromErrno(rpcError); return false; } } } buffp = PyString_FromStringAndSize(cp, slen); if (buffp == NULL) return false; *buffpp = buffp; free(cp); return true; } /* * Tell an rpc server to exit the "work routine" asap */ static PyObject * pyRpcServerExit(PyObject *self, PyObject *args) { rpcServer *servp; servp = (rpcServer *)self; servp->disp->etime = 0.0; Py_INCREF(Py_None); return Py_None; } /* * Tell an rpc server to exit the "work routine" asap */ static PyObject * pyRpcServerSetAuth(PyObject *self, PyObject *args) { rpcServer *servp; PyObject *authFunc; servp = (rpcServer *)self; unless (PyArg_ParseTuple(args, "O", &authFunc)) return NULL; rpcServerSetAuth(servp, authFunc); Py_INCREF(Py_None); return Py_None; } /* * Tell an rpc server to exit the "work routine" asap */ static PyObject * pyRpcServerAddSource(PyObject *self, PyObject *args) { rpcServer *servp; rpcSource *srcp; servp = (rpcServer *)self; unless (PyArg_ParseTuple(args, "O!", &rpcSourceType, &srcp)) return NULL; if (srcp->func == NULL) return setPyErr("callback function was NULL"); if (srcp->actImp == 0) return setPyErr("no callback actions to observe"); if (srcp->params == NULL) return setPyErr("callback params was NULL"); unless (PyTuple_Check(srcp->params)) return setPyErr("callback params was not a tuple"); unless (PyTuple_GET_SIZE(srcp->params) == 2) return setPyErr("callback params was not a 2 length tuple"); unless (PyCallable_Check(PyTuple_GET_ITEM(srcp->params, 0))) return setPyErr("callback params 1 was not callable"); unless (rpcDispAddSource(servp->disp, srcp)) return NULL; Py_INCREF(Py_None); return Py_None; } /* * Tell an rpc server to exit the "work routine" asap */ static PyObject * pyRpcServerDelSource(PyObject *self, PyObject *args) { rpcServer *servp; rpcSource *srcp; servp = (rpcServer *)self; unless (PyArg_ParseTuple(args, "O!", &rpcSourceType, &srcp)) return NULL; unless (rpcDispDelSource(servp->disp, srcp)) return NULL; Py_INCREF(Py_None); return Py_None; } static PyObject * pyRpcServerQueueResponse(PyObject *self, PyObject *args) { rpcServer *servp; rpcSource *srcp; PyObject *result; servp = (rpcServer *)self; unless (PyArg_ParseTuple(args, "O!O", &rpcSourceType, &srcp, &result)) return (NULL); if (doResponse(servp, srcp, result, true)) { Py_INCREF(Py_None); return (Py_None); } else return (NULL); } static PyObject * pyRpcServerQueueFault(PyObject *self, PyObject *args) { rpcServer *servp; rpcSource *srcp; PyObject *faultCode, *faultString; servp = (rpcServer *)self; unless (PyArg_ParseTuple(args, "O!OS", &rpcSourceType, &srcp, &faultCode, &faultString)) return (NULL); unless (PyInt_Check(faultCode)) { PyErr_SetString(rpcError, "errorCode must be an integer"); return NULL; } rpcFaultRaise(faultCode, faultString); Py_INCREF(Py_None); return (Py_None); } /* * member functions for server object */ static PyMethodDef pyRpcServerMethods[] = { { "activeFds", (PyCFunction)pyRpcServerActiveFds, 1, 0 }, { "addMethods", (PyCFunction)pyRpcServerAddMethods, 1, 0 }, { "bindAndListen", (PyCFunction)pyRpcServerBindAndListen, 1, 0 }, { "close", (PyCFunction)pyRpcServerClose, 1, 0 }, { "setFdAndListen", (PyCFunction)pyRpcServerSetFdAndListen, 1, 0 }, { "work", (PyCFunction)pyRpcServerWork, 1, 0 }, { "exit", (PyCFunction)pyRpcServerExit, 1, 0 }, { "addSource", (PyCFunction)pyRpcServerAddSource, 1, 0 }, { "delSource", (PyCFunction)pyRpcServerDelSource, 1, 0 }, { "setAuth", (PyCFunction)pyRpcServerSetAuth, 1, 0 }, { "setOnErr", (PyCFunction)pyRpcServerSetOnErr, 1, 0 }, { "queueFault", (PyCFunction)pyRpcServerQueueFault, 1, 0 }, { "queueResponse", (PyCFunction)pyRpcServerQueueResponse, 1, 0 }, { NULL, NULL}, }; /* * return an attribute for a server object */ static PyObject * pyRpcServerGetAttr(rpcServer *sp, char *name) { return Py_FindMethod(pyRpcServerMethods, (PyObject *)sp, name); } /* * map characterstics of an edb object */ PyTypeObject rpcServerType = { PyObject_HEAD_INIT(0) 0, "rpcServer", sizeof(rpcServer), 0, (destructor)rpcServerDealloc, /* tp_dealloc */ 0, /* tp_print */ (getattrfunc)pyRpcServerGetAttr, /* tp_getattr */ 0, /* tp_setattr */ 0, /* tp_compare */ 0, /* tp_repr */ 0, /* tp_as_number */ 0, /* tp_as_sequence */ 0, /* tp_as_mapping */ 0, /* tp_hash */ 0, /* tp_call */ 0, /* tp_str */ 0, /* tp_getattro */ 0, /* tp_setattro */ 0, /* tp_as_buffer */ 0, /* tp_xxx4 */ 0, /* tp_doc */ };