# Parallel Python Software: http://www.parallelpython.com # Copyright (c) 2005-2008, Vitalii Vanovschi # All rights reserved. # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the author nor the names of its contributors # may be used to endorse or promote products derived from this software # without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # THE POSSIBILITY OF SUCH DAMAGE. """ Parallel Python Software, PP Transport http://www.parallelpython.com - updates, documentation, examples and support forums """ import md5, os, struct, sha, socket, logging copyright = "Copyright (c) 2005-2008 Vitalii Vanovschi. All rights reserved" version = "1.5" class Transport(object): def send(self, msg): raise NotImplemented("abstact function 'send' must be implemented "\ "in a subclass") def receive(self): raise NotImplemented("abstact function 'receive' must be implemented "\ "in a subclass") def authenticate(self, secret): remote_version = self.receive() if version != remote_version: logging.error("PP version mismatch (local: pp-%s, remote: pp-%s)" % (version, remote_version)); logging.error("Please install the same version of PP on all nodes") return False srandom = self.receive() answer = sha.new(srandom+secret).hexdigest() self.send(answer) response = self.receive() if response == "OK": return True else: return False def close(self): pass def _connect(self): pass class CTransport(Transport): """Cached transport """ rcache = {} def hash(self, msg): return md5.new(msg).hexdigest() def csend(self, msg): hash1 = self.hash(msg) if self.scache.has_key(hash1): self.send("H" + hash1) else: self.send("N" + msg) self.scache[hash1] = True def creceive(self, preprocess=None): msg = self.receive() if msg[0] == 'H': hash1 = msg[1:] else: msg = msg[1:] hash1 = self.hash(msg) self.rcache[hash1] = map(preprocess, (msg,))[0] return self.rcache[hash1] class PipeTransport(Transport): def __init__(self, r, w): self.scache = {} if isinstance(r, file) and isinstance(w, file): self.r = r self.w = w else: raise TypeError("Both arguments of PipeTransport constructor must "\ "be file objects") def send(self, msg): self.w.write(str(len(msg))+"\n") self.w.write(msg) self.w.flush() def receive(self, preprocess=None): s = self.r.readline() msg_len = int(s) msg = self.r.read(msg_len) return map(preprocess, (msg,))[0] class SocketTransport(Transport): def __init__(self, socket1=None): if socket1: self.socket = socket1 else: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.scache = {} def send(self, data): size = struct.pack("!Q", len(data)) t_size = struct.calcsize("!Q") s_size = 0L while s_size < t_size: p_size = self.socket.send(size[s_size:]) if p_size == 0: raise RuntimeError("Socket connection is broken") s_size += p_size t_size = len(data) s_size = 0L while s_size < t_size: p_size = self.socket.send(data[s_size:]) if p_size == 0: raise RuntimeError("Socket connection is broken") s_size += p_size def receive(self): e_size = struct.calcsize("!Q") r_size = 0 data = "" while r_size < e_size: msg = self.socket.recv(e_size-r_size) if msg == "": raise RuntimeError("Socket connection is broken") r_size += len(msg) data += msg e_size = struct.unpack("!Q", data)[0] r_size = 0 data = "" while r_size < e_size: msg = self.socket.recv(e_size-r_size) if msg == "": raise RuntimeError("Socket connection is broken") r_size += len(msg) data += msg return data def close(self): self.socket.close() def _connect(self, host, port): self.socket.connect((host, port)) class CPipeTransport(PipeTransport, CTransport): pass class CSocketTransport(SocketTransport, CTransport): pass # Parallel Python Software: http://www.parallelpython.com