# Parallel Python Software: http://www.parallelpython.com # Copyright (c) 2005-2008, Vitalii Vanovschi # All rights reserved. # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the author nor the names of its contributors # may be used to endorse or promote products derived from this software # without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # THE POSSIBILITY OF SUCH DAMAGE. """ Parallel Python Software, Execution Server http://www.parallelpython.com - updates, documentation, examples and support forums """ import os, thread, popen2, logging, inspect, sys, sha, types, time, atexit import cPickle as pickle import pptransport, ppauto copyright = "Copyright (c) 2005-2008 Vitalii Vanovschi. All rights reserved" version = "1.5" # reconnect persistent rworkers in 5 sec _RECONNECT_WAIT_TIME = 5 class _Task(object): """Class describing single task (job) """ def __init__(self, server, tid, callback=None, callbackargs=(), group='default'): """Initializes the task""" self.lock = thread.allocate_lock() self.lock.acquire() self.tid = tid self.callback = callback self.callbackargs = callbackargs self.group = group self.finished = False self.unpickled = False # for internal use only def finalize(self, sresult): """Finalizes the task""" self.sresult = sresult if self.callback: self.__unpickle() self.lock.release() self.finished = True def __call__(self, raw_result=False): """Retrieves result of the task""" self.wait() if not self.unpickled and not raw_result: self.__unpickle() if raw_result: return self.sresult else: return self.result def wait(self): """Waits for the task""" if not self.finished: self.lock.acquire() self.lock.release() def __unpickle(self): """Unpickles the result of the task""" self.result, sout = pickle.loads(self.sresult) self.unpickled = True if len(sout) > 0: print sout, if self.callback: args = self.callbackargs + (self.result,) self.callback(*args) class _Worker(object): """Local worker class """ def __init__(self): """Initializes local worker""" command = "\"" + sys.executable + "\" \"" \ + os.path.dirname(os.path.abspath(__file__))\ + os.sep + "ppworker.py\"" if sys.platform.startswith("win"): # workargound for windows command = "\"" + command + "\"" else: # do not show "Borken pipe message" at exit on unix/linux command += " 2>/dev/null" self.t = pptransport.CPipeTransport(*popen2.popen2(command)) self.pid = int(self.t.receive()) self.is_free = True class _RWorker(pptransport.CSocketTransport): """Remote worker class """ def __init__(self, host, port, secret, message=None, persistent=True): """Initializes remote worker""" self.persistent = persistent self.host = host self.port = port self.secret = secret self.address = (host, port) self.id = host + ":" + str(port) logging.debug("Creating Rworker id=%s persistent=%s" % (self.id, persistent)); self.connect(message) self.is_free = True def __del__(self): """Closes connection with remote server""" self.close() def connect(self, message=None): """Connects to a remote server""" while True: try: pptransport.SocketTransport.__init__(self) self._connect(self.host, self.port) if not self.authenticate(self.secret): logging.error("Authentication failed for host=%s, port=%s" % (self.host, self.port)) return False if message: self.send(message) self.is_free = True return True except: if not self.persistent: logging.debug("Deleting from queue Rworker %s" % (self.id,)) return False # print sys.excepthook(*sys.exc_info()) logging.debug("Failed to reconnect with " \ "(host=%s, port=%i), will try again in %i s" % (self.host, self.port, _RECONNECT_WAIT_TIME)) time.sleep(_RECONNECT_WAIT_TIME) class _Statistics(object): """Class to hold execution statisitcs for a single node """ def __init__(self, ncpus, rworker=None): """Initializes statistics for a node""" self.ncpus = ncpus self.time = 0.0 self.njobs = 0 self.rworker = rworker class Server(object): """Parallel Python SMP execution server class """ default_port = 60000 default_secret = "epo20pdosl;dksldkmm" def __init__(self, ncpus="autodetect", ppservers=(), secret=None, loglevel=logging.WARNING, logstream=sys.stderr): """ Creates server instance ncpus - the number of worker processes to start on the local computer, if parameter is omitted it will be set to the number of processors in the system ppservers - list of active parallel python execution servers to connect with secret - passphrase for network connections, if omitted a default passphrase will be used. It's highly recommended to use a custom passphrase for all network connections. loglevel - logging level logstream - log stream destination With ncpus = 1 all tasks are executed consequently For the best performance either use the default "autodetect" value or set ncpus to the total number of processors in the system """ self.__initLog(loglevel, logstream) logging.debug("Creating server instance (pp-" + version+")") self.__tid = 0 self.__active_tasks = 0 self.__active_tasks_lock = thread.allocate_lock() self.__queue = [] self.__queue_lock = thread.allocate_lock() self.__workers = [] self.__rworkers = [] self.__rworkers_reserved = [] self.__rworkers_reserved4 = [] self.__sourcesHM = {} self.__sfuncHM = {} self.__waittasks = [] self.__waittasks_lock = thread.allocate_lock() self.__exiting = False self.__accurate_stats = True self.autopp_list = {} self.__active_rworkers_list_lock = thread.allocate_lock() # add local directory to PYTHONPATH localdir = os.getcwd() if os.environ.has_key("PYTHONPATH"): if os.environ["PYTHONPATH"]: os.environ["PYTHONPATH"] = localdir + ":" \ + os.environ["PYTHONPATH"]; atexit.register(self.__atexit) self.__stats = {"local":_Statistics(0)} self.set_ncpus(ncpus) self.ppservers = [] self.auto_ppservers = [] for ppserver in ppservers: ppserver = ppserver.split(":") host = ppserver[0] if len(ppserver)>1: port = int(ppserver[1]) else: port = Server.default_port if host.find("*") == -1: self.ppservers.append((host,port)) else: if host == "*": host = "*.*.*.*" host = host.replace("*","255") self.auto_ppservers.append((host,port)) self.__stats_lock = thread.allocate_lock() if secret is not None: if not isinstance(secret, types.StringType): raise TypeError("secret must be of a string type") self.secret = str(secret) else: self.secret = Server.default_secret self.__connect() self.__creation_time = time.time() logging.info("pp local server started with %d workers" % (self.__ncpus,)) def submit(self, func, args=(), depfuncs=(), modules=(), callback=None, callbackargs=(), group='default',globals=None): """Submits function to the execution queue func - function to be executed args - tuple with arguments of the 'func' depfuncs - tuple with functions which might be called from 'func' modules - tuple with module names to import callback - callback function which will be called with argument list equal to callbackargs+(result,) as soon as calculation is done callbackargs - additional arguments for callback function group - job group, is used when wait(group) is called to wait for jobs in a given group to finish globals - dictionary from which all modules, functions and classes will be imported, for instance: globals=globals() """ tid = self.__gentid() if globals: modules = tuple(self.__find_modules("", globals)) try: #python 2.3 do not have sets modules = tuple(set(modules)) except NameError: pass self.__logger.debug("Task %i will autoimport next modules: %s" % (tid, str(modules))) for name, object in globals.items(): if isinstance(object, types.FunctionType) \ or isinstance(object, types.ClassType): depfuncs += (object, ) task = _Task(self, tid, callback, callbackargs, group) self.__waittasks_lock.acquire() self.__waittasks.append(task) self.__waittasks_lock.release() # if the function is a method of a class add self to the arguments list if isinstance(func, types.MethodType) and func.im_self: args = (func.im_self,) + args # if there is an instance of a user deined class in the arguments add # whole calss to dependancies for arg in args: if isinstance(arg, types.InstanceType): depfuncs += (arg.__class__, ) # if there is an instance of a function in the arguments add this # function to dependancies for arg in args: if isinstance(arg, types.FunctionType): depfuncs += (arg, ) sfunc = self.__dumpsfunc((func,)+depfuncs, modules) sargs = pickle.dumps(args) self.__queue_lock.acquire() self.__queue.append((task, sfunc, sargs)) self.__queue_lock.release() self.__logger.debug("Task %i submited, function='%s'" % (tid, func.func_name)) self.__scheduler() return task def wait(self, group=None): """Waits for all jobs in a given group to finish. If group is omitted waits for all jobs to finish """ while True: self.__waittasks_lock.acquire() for task in self.__waittasks: if not group or task.group == group: self.__waittasks_lock.release() task.wait() break else: self.__waittasks_lock.release() break def get_ncpus(self): """Returns the number of local worker processes (ppworkers)""" return self.__ncpus def set_ncpus(self, ncpus="autodetect"): """Sets the number of local worker processes (ppworkers) ncpus - the number of worker processes, if parammeter is omitted it will be set to the number of processors in the system""" if ncpus == "autodetect": ncpus = self.__detect_ncpus() if not isinstance(ncpus, int): raise TypeError("ncpus must have 'int' type") if ncpus < 0: raise ValueError("ncpus must be an integer > 0"); if ncpus > len(self.__workers): self.__workers.extend([_Worker() for x in \ range(ncpus - len(self.__workers))]) self.__stats["local"].ncpus = ncpus self.__ncpus = ncpus def get_active_nodes(self): """Returns active nodes as a dictionary [keys - nodes, values - ncpus]""" active_nodes = {} for node, stat in self.__stats.items(): if node == "local" or self.autopp_list.has_key(node) \ and self.autopp_list[node]: active_nodes[node] = stat.ncpus return active_nodes def get_stats(self): """Returns job execution statistics as a dictionary""" for node, stat in self.__stats.items(): if stat.rworker: try: stat.rworker.send("TIME") stat.time = float(stat.rworker.receive()) except: self.__accurate_stats = False stat.time = 0.0 return self.__stats def print_stats(self): """Prints job execution statistics. Useful for benchmarking on clusters""" print "Job execution statistics:" walltime = time.time()-self.__creation_time statistics = self.get_stats().items() totaljobs = 0.0 for ppserver, stat in statistics: totaljobs += stat.njobs print " job count | % of all jobs | job time sum | " \ "time per job | job server" for ppserver, stat in statistics: if stat.njobs: print " %6i | %6.2f | %8.4f | %11.6f | %s" \ % (stat.njobs, 100.0*stat.njobs/totaljobs, stat.time, stat.time/stat.njobs, ppserver, ) print "Time elapsed since server creation", walltime if not self.__accurate_stats: print "WARNING: statistics provided above is not accurate" \ "due to job rescheduling" print # all methods below are for internal use only def insert(self, sfunc, sargs, task=None): """Inserts function into the execution queue. It's intended for internal use only (ppserver.py). """ if not task: tid = self.__gentid() task = _Task(self, tid) self.__queue_lock.acquire() self.__queue.append((task, sfunc, sargs)) self.__queue_lock.release() self.__logger.debug("Task %i inserted" % (task.tid, )) self.__scheduler() return task def connect1(self, host, port, persistent=True): """Conects to a remote ppserver specified by host and port""" try: rworker = _RWorker(host, port, self.secret, "STAT", persistent) ncpus = int(rworker.receive()) id = host+":"+str(port) self.__stats[id] = _Statistics(ncpus, rworker) for x in range(ncpus-1): rworker = _RWorker(host, port, self.secret, "EXEC", persistent) self.__update_active_rworkers(rworker.id, 1); # append is atomic - no need to lock self.__rworkers self.__rworkers.append(rworker) #creating reserved rworkers for x in range(ncpus): rworker = _RWorker(host, port, self.secret, "EXEC", persistent) self.__update_active_rworkers(rworker.id, 1); self.__rworkers_reserved.append(rworker) #creating reserved4 rworkers for x in range(ncpus*0): rworker = _RWorker(host, port, self.secret, "EXEC", persistent) # self.__update_active_rworkers(rworker.id, 1); self.__rworkers_reserved4.append(rworker) logging.debug("Connected to ppserver (host=%s, port=%i) \ with %i workers" % (host, port, ncpus)) self.__scheduler() except: pass # sys.excepthook(*sys.exc_info()) def __connect(self): """Connects to all remote ppservers""" for ppserver in self.ppservers: thread.start_new_thread(self.connect1, ppserver) discover = ppauto.Discover(self, True) for ppserver in self.auto_ppservers: thread.start_new_thread(discover.run, (ppserver,)) def __detect_ncpus(self): """Detects the number of effective CPUs in the system""" #for Linux, Unix and MacOS if hasattr(os, "sysconf"): if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"): #Linux and Unix ncpus = os.sysconf("SC_NPROCESSORS_ONLN") if isinstance(ncpus, int) and ncpus > 0: return ncpus else: #MacOS X return int(os.popen2("sysctl -n hw.ncpu")[1].read()) #for Windows if os.environ.has_key("NUMBER_OF_PROCESSORS"): ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]); if ncpus > 0: return ncpus #return the default value return 1 def __initLog(self, loglevel, logstream): """Initializes logging facility""" logHandler = logging.StreamHandler(logstream) logHandler.setLevel(loglevel) LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s' logHandler.setFormatter(logging.Formatter(LOG_FORMAT)) self.__logger = logging.getLogger('') self.__logger.addHandler(logHandler) self.__logger.setLevel(loglevel) def __dumpsfunc(self, funcs, modules): """Serializes functions and modules""" hashs = hash(funcs+modules) if not self.__sfuncHM.has_key(hashs): sources = [self.__get_source(func) for func in funcs] self.__sfuncHM[hashs] = pickle.dumps( (funcs[0].func_name, sources, modules)) return self.__sfuncHM[hashs] def __find_modules(self, prefix, dict): """recursively finds all the modules in dict""" modules = [] for name, object in dict.items(): if isinstance(object, types.ModuleType) \ and name not in ("__builtins__", "pp"): if object.__name__ == prefix+name or prefix == "": modules.append(object.__name__) modules.extend(self.__find_modules( object.__name__+".", object.__dict__)) return modules def __scheduler(self): """Schedules jobs for execution""" self.__queue_lock.acquire() while self.__queue: if self.__active_tasks < self.__ncpus: #TODO: select a job number on the basis of heuristic task = self.__queue.pop(0) for worker in self.__workers: if worker.is_free: worker.is_free = False break else: self.__logger.error("There are no free workers left") raise "Error: No free workers" self.__add_to_active_tasks(1) try: self.__stats["local"].njobs += 1 thread.start_new_thread(self.__run, task+(worker,)) except: pass else: for rworker in self.__rworkers: if rworker.is_free: rworker.is_free = False task = self.__queue.pop(0) self.__stats[rworker.id].njobs += 1 thread.start_new_thread(self.__rrun, task+(rworker,)) break else: if len(self.__queue) > self.__ncpus: for rworker in self.__rworkers_reserved: if rworker.is_free: rworker.is_free = False task = self.__queue.pop(0) self.__stats[rworker.id].njobs += 1 thread.start_new_thread(self.__rrun, task+(rworker,)) break else: break # this code will not be executed # and is left for further releases if len(self.__queue) > self.__ncpus*0: for rworker in self.__rworkers_reserved4: if rworker.is_free: rworker.is_free = False task = self.__queue.pop(0) self.__stats[rworker.id].njobs += 1 thread.start_new_thread(self.__rrun, task+(rworker,)) break else: break self.__queue_lock.release() def __get_source(self, func): """Fetches source of the function""" hashf = hash(func) if not self.__sourcesHM.has_key(hashf): #get lines of the source and adjust indent sourcelines = inspect.getsourcelines(func)[0] fl = sourcelines[0] i = 0 while fl[i] in (' ', '\t'): i += 1 source = "" for line in sourcelines: source += line[i:] # source = inspect.getsource(func) self.__sourcesHM[hashf] = source return self.__sourcesHM[hashf] def __rrun(self, job, sfunc, sargs, rworker): """Runs a job remotelly""" self.__logger.debug("Task (remote) %i started" % (job.tid,)) try: rworker.csend(sfunc) rworker.send(sargs) sresult = rworker.receive() rworker.is_free = True except: self.__logger.debug("Task %i failed due to broken network " \ "connection - rescheduling" % (job.tid,)) self.insert(sfunc, sargs, job) self.__scheduler() self.__update_active_rworkers(rworker.id, -1) if rworker.connect("EXEC"): self.__update_active_rworkers(rworker.id, 1) self.__scheduler() return job.finalize(sresult) # remove the job from the waiting list if self.__waittasks: self.__waittasks_lock.acquire() self.__waittasks.remove(job) self.__waittasks_lock.release() self.__logger.debug("Task (remote) %i ended" % (job.tid,)) self.__scheduler() def __run(self, job, sfunc, sargs, worker): """Runs a job locally""" self.__logger.debug("Task %i started" % (job.tid,)) start_time = time.time() try: worker.t.csend(sfunc) worker.t.send(sargs) sresult = worker.t.receive() except: if self.__exiting: return else: sys.excepthook(*sys.exc_info()) worker.is_free = True job.finalize(sresult) # remove the job from the waiting list if self.__waittasks: self.__waittasks_lock.acquire() self.__waittasks.remove(job) self.__waittasks_lock.release() self.__add_to_active_tasks(-1) if not self.__exiting: self.__stat_add_time("local", time.time()-start_time) self.__logger.debug("Task %i ended" % (job.tid,)) self.__scheduler() def __add_to_active_tasks(self, num): """Updates the number of active tasks""" self.__active_tasks_lock.acquire() self.__active_tasks += num self.__active_tasks_lock.release() def __stat_add_time(self, node, time): """Updates total runtime on the node""" self.__stats_lock.acquire() self.__stats[node].time += time self.__stats_lock.release() def __stat_add_job(self, node): """Increments job count on the node""" self.__stats_lock.acquire() self.__stats[node].njobs += 1 self.__stats_lock.release() def __update_active_rworkers(self, id, count): """Updates list of active rworkers""" self.__active_rworkers_list_lock.acquire() if not self.autopp_list.has_key(id): self.autopp_list[id] = 0 self.autopp_list[id] += count self.__active_rworkers_list_lock.release() def __gentid(self): """Generates a unique job ID number""" self.__tid += 1 return self.__tid - 1 def __atexit(self): """Cleans up ppworkers at exit""" self.__exiting = True; for worker in self.__workers: if sys.platform.startswith("win"): os.popen('TASKKILL /PID '+str(worker.pid)+' /F') else: pass #try: # os.kill(worker.pid, 9) # os.waitpid(worker.pid, 0) #except: # pass # Parallel Python Software: http://www.parallelpython.com