#!/usr/bin/env python
# Parallel Python Software: http://www.parallelpython.com
# Copyright (c) 2005-2008, Vitalii Vanovschi
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
"""
Parallel Python Software, Network Server
http://www.parallelpython.com - updates, documentation, examples and support
forums
"""
import logging, getopt, sys, socket, thread, random, string, sha, time, os
import pptransport, ppauto
from pp import Server
copyright = "Copyright (c) 2005-2008 Vitalii Vanovschi. All rights reserved"
version = "1.5"
class _NetworkServer(Server):
def __init__(self, ncpus="autodetect", interface="0.0.0.0",
broadcast="255.255.255.255", port=None, secret=None,
timeout=None, loglevel=logging.WARNING):
Server.__init__(self, ncpus, loglevel=loglevel)
self.host = interface
self.bcast = broadcast
if port is not None:
self.port = port
else:
self.port = self.default_port
if secret is not None:
self.secret = secret
else:
self.secret = self.default_secret
self.timeout = timeout
self.ncon = 0
self.last_con_time = time.time()
self.ncon_lock = thread.allocate_lock()
logging.debug("Strarting network server interface=%s port=%i"
% (self.host, self.port))
if self.timeout is not None:
logging.debug("ppserver will exit in %i seconds if no "\
"connections with clients exist" % (self.timeout))
thread.start_new_thread(self.check_timeout, ())
def ncon_add(self, val):
self.ncon_lock.acquire()
self.ncon += val
self.last_con_time = time.time()
self.ncon_lock.release()
def check_timeout(self):
while True:
if self.ncon == 0:
idle_time = time.time() - self.last_con_time
if idle_time < self.timeout:
time.sleep(self.timeout - idle_time)
else:
logging.debug("exiting ppserver due to timeout (no client"\
"connections in last %i sec)", self.timeout)
os._exit(0)
else:
time.sleep(self.timeout)
def listen(self):
try:
ssocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# following allows ppserver to restart faster on the same port
ssocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssocket.bind((self.host, self.port))
ssocket.listen(5)
except socket.error:
logging.error("Cannot create socket with port " + str(self.port)
+ " (port is already in use)")
try:
while 1:
#accept connections from outside
(csocket, address) = ssocket.accept()
#now do something with the clientsocket
#in this case, we'll pretend this is a threaded server
thread.start_new_thread(self.crun, (csocket,))
except:
logging.debug("Closing server socket")
ssocket.close()
def crun(self, csocket):
socket = pptransport.CSocketTransport(csocket)
#send PP version
socket.send(version)
#generate a random string
srandom = "".join([random.choice(string.ascii_letters)
for i in xrange(16)])
socket.send(srandom)
answer = sha.new(srandom+self.secret).hexdigest()
cleintanswer = socket.receive()
if answer != cleintanswer:
logging.warning("Authentification failed, client host=%s, port=%i"
% csocket.getpeername())
socket.send("FAILED")
csocket.close()
return
else:
socket.send("OK")
ctype = socket.receive()
logging.debug("Control message received: " + ctype)
self.ncon_add(1)
try:
if ctype=="STAT":
#reset time at each new connection
self.get_stats()["local"].time = 0.0
socket.send(str(self.get_ncpus()))
while 1:
socket.receive()
socket.send(str(self.get_stats()["local"].time))
elif ctype=="EXEC":
while 1:
sfunc = socket.creceive()
sargs = socket.receive()
f = self.insert(sfunc, sargs)
sresult = f(True)
socket.send(sresult)
except:
#print sys.excepthook(*sys.exc_info())
logging.debug("Closing client socket")
csocket.close()
self.ncon_add(-1)
def broadcast(self):
discover = ppauto.Discover(self)
thread.start_new_thread(discover.run, ((self.bcast, self.port),))
def print_usage():
print "Parallel Python Network Server (pp-"+version+")"
print "Usage: ppserver.py [-hda] [-i interface] [-b broadcast] "\
"[-p port] [-w nworkers] [-s secret] [-t seconds]"
print "Options: "
print "-h : this help message"
print "-d : debug"
print "-a : enable auto-discovery service"
print "-i interface : interface to listen"
print "-b broadcast : broadcast address for auto-discovery service"
print "-p port : port to listen"
print "-w nworkers : number of workers to start"
print "-s secret : secret for authentication"
print "-t seconds : timeout to exit if no connections with "\
"clients exist"
print
print "Please visit http://www.parallelpython.com for extended up-to-date"
print "documentation, examples and support forums"
if __name__ == "__main__":
try:
opts, args = getopt.getopt(sys.argv[1:], "hdab:i:p:w:s:t:", ["help"])
except getopt.GetoptError:
print_usage()
sys.exit(1)
args = {}
autodiscovery = False
for opt, arg in opts:
if opt in ("-h", "--help"):
print_usage()
sys.exit()
elif opt == "-d":
args["loglevel"] = logging.DEBUG
elif opt == "-i":
args["interface"] = arg
elif opt == "-s":
args["secret"] = arg
elif opt == "-p":
args["port"] = int(arg)
elif opt == "-w":
args["ncpus"] = int(arg)
elif opt == "-a":
autodiscovery = True
elif opt == "-b":
args["broadcast"] = arg
elif opt == "-t":
args["timeout"] = int(arg)
server = _NetworkServer(**args)
if autodiscovery:
server.broadcast()
server.listen()
#have to destroy it here explicitelly otherwise an exception
#comes out in Python 2.4
del server
# Parallel Python Software: http://www.parallelpython.com
syntax highlighted by Code2HTML, v. 0.9.1