#!/usr/bin/env python # Parallel Python Software: http://www.parallelpython.com # Copyright (c) 2005-2008, Vitalii Vanovschi # All rights reserved. # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the author nor the names of its contributors # may be used to endorse or promote products derived from this software # without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # THE POSSIBILITY OF SUCH DAMAGE. """ Parallel Python Software, Network Server http://www.parallelpython.com - updates, documentation, examples and support forums """ import logging, getopt, sys, socket, thread, random, string, sha, time, os import pptransport, ppauto from pp import Server copyright = "Copyright (c) 2005-2008 Vitalii Vanovschi. All rights reserved" version = "1.5" class _NetworkServer(Server): def __init__(self, ncpus="autodetect", interface="0.0.0.0", broadcast="255.255.255.255", port=None, secret=None, timeout=None, loglevel=logging.WARNING): Server.__init__(self, ncpus, loglevel=loglevel) self.host = interface self.bcast = broadcast if port is not None: self.port = port else: self.port = self.default_port if secret is not None: self.secret = secret else: self.secret = self.default_secret self.timeout = timeout self.ncon = 0 self.last_con_time = time.time() self.ncon_lock = thread.allocate_lock() logging.debug("Strarting network server interface=%s port=%i" % (self.host, self.port)) if self.timeout is not None: logging.debug("ppserver will exit in %i seconds if no "\ "connections with clients exist" % (self.timeout)) thread.start_new_thread(self.check_timeout, ()) def ncon_add(self, val): self.ncon_lock.acquire() self.ncon += val self.last_con_time = time.time() self.ncon_lock.release() def check_timeout(self): while True: if self.ncon == 0: idle_time = time.time() - self.last_con_time if idle_time < self.timeout: time.sleep(self.timeout - idle_time) else: logging.debug("exiting ppserver due to timeout (no client"\ "connections in last %i sec)", self.timeout) os._exit(0) else: time.sleep(self.timeout) def listen(self): try: ssocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # following allows ppserver to restart faster on the same port ssocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) ssocket.bind((self.host, self.port)) ssocket.listen(5) except socket.error: logging.error("Cannot create socket with port " + str(self.port) + " (port is already in use)") try: while 1: #accept connections from outside (csocket, address) = ssocket.accept() #now do something with the clientsocket #in this case, we'll pretend this is a threaded server thread.start_new_thread(self.crun, (csocket,)) except: logging.debug("Closing server socket") ssocket.close() def crun(self, csocket): socket = pptransport.CSocketTransport(csocket) #send PP version socket.send(version) #generate a random string srandom = "".join([random.choice(string.ascii_letters) for i in xrange(16)]) socket.send(srandom) answer = sha.new(srandom+self.secret).hexdigest() cleintanswer = socket.receive() if answer != cleintanswer: logging.warning("Authentification failed, client host=%s, port=%i" % csocket.getpeername()) socket.send("FAILED") csocket.close() return else: socket.send("OK") ctype = socket.receive() logging.debug("Control message received: " + ctype) self.ncon_add(1) try: if ctype=="STAT": #reset time at each new connection self.get_stats()["local"].time = 0.0 socket.send(str(self.get_ncpus())) while 1: socket.receive() socket.send(str(self.get_stats()["local"].time)) elif ctype=="EXEC": while 1: sfunc = socket.creceive() sargs = socket.receive() f = self.insert(sfunc, sargs) sresult = f(True) socket.send(sresult) except: #print sys.excepthook(*sys.exc_info()) logging.debug("Closing client socket") csocket.close() self.ncon_add(-1) def broadcast(self): discover = ppauto.Discover(self) thread.start_new_thread(discover.run, ((self.bcast, self.port),)) def print_usage(): print "Parallel Python Network Server (pp-"+version+")" print "Usage: ppserver.py [-hda] [-i interface] [-b broadcast] "\ "[-p port] [-w nworkers] [-s secret] [-t seconds]" print "Options: " print "-h : this help message" print "-d : debug" print "-a : enable auto-discovery service" print "-i interface : interface to listen" print "-b broadcast : broadcast address for auto-discovery service" print "-p port : port to listen" print "-w nworkers : number of workers to start" print "-s secret : secret for authentication" print "-t seconds : timeout to exit if no connections with "\ "clients exist" print print "Please visit http://www.parallelpython.com for extended up-to-date" print "documentation, examples and support forums" if __name__ == "__main__": try: opts, args = getopt.getopt(sys.argv[1:], "hdab:i:p:w:s:t:", ["help"]) except getopt.GetoptError: print_usage() sys.exit(1) args = {} autodiscovery = False for opt, arg in opts: if opt in ("-h", "--help"): print_usage() sys.exit() elif opt == "-d": args["loglevel"] = logging.DEBUG elif opt == "-i": args["interface"] = arg elif opt == "-s": args["secret"] = arg elif opt == "-p": args["port"] = int(arg) elif opt == "-w": args["ncpus"] = int(arg) elif opt == "-a": autodiscovery = True elif opt == "-b": args["broadcast"] = arg elif opt == "-t": args["timeout"] = int(arg) server = _NetworkServer(**args) if autodiscovery: server.broadcast() server.listen() #have to destroy it here explicitelly otherwise an exception #comes out in Python 2.4 del server # Parallel Python Software: http://www.parallelpython.com