# Parallel Python Software: http://www.parallelpython.com
# Copyright (c) 2005-2008, Vitalii Vanovschi
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
"""
Parallel Python Software, PP Transport
http://www.parallelpython.com - updates, documentation, examples and support
forums
"""
import md5, os, struct, sha, socket, logging
copyright = "Copyright (c) 2005-2008 Vitalii Vanovschi. All rights reserved"
version = "1.5"
class Transport(object):
def send(self, msg):
raise NotImplemented("abstact function 'send' must be implemented "\
"in a subclass")
def receive(self):
raise NotImplemented("abstact function 'receive' must be implemented "\
"in a subclass")
def authenticate(self, secret):
remote_version = self.receive()
if version != remote_version:
logging.error("PP version mismatch (local: pp-%s, remote: pp-%s)"
% (version, remote_version));
logging.error("Please install the same version of PP on all nodes")
return False
srandom = self.receive()
answer = sha.new(srandom+secret).hexdigest()
self.send(answer)
response = self.receive()
if response == "OK":
return True
else:
return False
def close(self):
pass
def _connect(self):
pass
class CTransport(Transport):
"""Cached transport
"""
rcache = {}
def hash(self, msg):
return md5.new(msg).hexdigest()
def csend(self, msg):
hash1 = self.hash(msg)
if self.scache.has_key(hash1):
self.send("H" + hash1)
else:
self.send("N" + msg)
self.scache[hash1] = True
def creceive(self, preprocess=None):
msg = self.receive()
if msg[0] == 'H':
hash1 = msg[1:]
else:
msg = msg[1:]
hash1 = self.hash(msg)
self.rcache[hash1] = map(preprocess, (msg,))[0]
return self.rcache[hash1]
class PipeTransport(Transport):
def __init__(self, r, w):
self.scache = {}
if isinstance(r, file) and isinstance(w, file):
self.r = r
self.w = w
else:
raise TypeError("Both arguments of PipeTransport constructor must "\
"be file objects")
def send(self, msg):
self.w.write(str(len(msg))+"\n")
self.w.write(msg)
self.w.flush()
def receive(self, preprocess=None):
s = self.r.readline()
msg_len = int(s)
msg = self.r.read(msg_len)
return map(preprocess, (msg,))[0]
class SocketTransport(Transport):
def __init__(self, socket1=None):
if socket1:
self.socket = socket1
else:
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.scache = {}
def send(self, data):
size = struct.pack("!Q", len(data))
t_size = struct.calcsize("!Q")
s_size = 0L
while s_size < t_size:
p_size = self.socket.send(size[s_size:])
if p_size == 0:
raise RuntimeError("Socket connection is broken")
s_size += p_size
t_size = len(data)
s_size = 0L
while s_size < t_size:
p_size = self.socket.send(data[s_size:])
if p_size == 0:
raise RuntimeError("Socket connection is broken")
s_size += p_size
def receive(self):
e_size = struct.calcsize("!Q")
r_size = 0
data = ""
while r_size < e_size:
msg = self.socket.recv(e_size-r_size)
if msg == "":
raise RuntimeError("Socket connection is broken")
r_size += len(msg)
data += msg
e_size = struct.unpack("!Q", data)[0]
r_size = 0
data = ""
while r_size < e_size:
msg = self.socket.recv(e_size-r_size)
if msg == "":
raise RuntimeError("Socket connection is broken")
r_size += len(msg)
data += msg
return data
def close(self):
self.socket.close()
def _connect(self, host, port):
self.socket.connect((host, port))
class CPipeTransport(PipeTransport, CTransport):
pass
class CSocketTransport(SocketTransport, CTransport):
pass
# Parallel Python Software: http://www.parallelpython.com
syntax highlighted by Code2HTML, v. 0.9.1