### py_interface -- A Python-implementation of an Erlang node ### ### $Id: erl_node_conn.py,v 1.16 2002/05/28 22:09:25 tab Exp $ ### ### Copyright (C) 2002 Tomas Abrahamsson ### ### Author: Tomas Abrahamsson ### ### This file is part of the Py-Interface library ### ### This library is free software; you can redistribute it and/or ### modify it under the terms of the GNU Library General Public ### License as published by the Free Software Foundation; either ### version 2 of the License, or (at your option) any later version. ### ### This library is distributed in the hope that it will be useful, ### but WITHOUT ANY WARRANTY; without even the implied warranty of ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ### Library General Public License for more details. ### ### You should have received a copy of the GNU Library General Public ### License along with this library; if not, write to the Free ### Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ### erl_node_conn.py -- Handle inter-node communication import sys import time import types import string import socket import random import md5 import erl_opts import erl_term import erl_common import erl_async_conn import erl_eventhandler M = "erl_node_conn" def CheckDigest(digest, challenge, cookie): """Checks that a digest is correct. DIGEST = string CHALLENGE = integer | longinteger COOKIE = string Returns: 1 | 0 Throws: nothing """ expectedDigest = GenDigest(challenge, cookie) return expectedDigest == digest def GenDigest(challenge, cookie): """Generates a digest from a CHALLENGE and a COOKIE. CHALLENGE = integer | longinteger COOKIE = string Returns: string Throws: nothing """ challengeStr = str(challenge) if challengeStr[-1] == 'L': challengeStr = challengeStr[:-1] return md5.new(cookie + challengeStr).digest() def GenChallenge(): """Generates a challenge. No arguments. Returns: integer Throws: nothing """ return int(random.random() * 0x7fffffff) class Ticker: """This class is used for keeping track of the net-ticks: * when a remote node has been silent for too long * when it's time to send a tick so the other part won't think we've been silent for to long (to `tick'). """ def __init__(self, netTickTime, timeToTickCb, noResponseCb): """Constructor NET-TICK-TIME = integer the net-tick-time (in seconds). TIME-TO-TICK-CB = callback to call when it's time to tick. NO-RESPONSE-CB = callback to call when the other end has been silent for too long. Throws: nothing """ self._netTickTime = netTickTime self._evhandler = erl_eventhandler.GetEventHandler() self._InitStartResponseTimer(netTickTime, noResponseCb) self._InitStartTickTimer(netTickTime, timeToTickCb) def _InitStartResponseTimer(self, netTickTime, noResponseCb): self._noResponseCb = noResponseCb self._noResponseTimeout = netTickTime * 1.25 self._responseCheckTimeout = netTickTime * 0.25 self._responseDoCheck = 1 self._timeForLastResponse = time.time() self._StartResponseTimer() def _StartResponseTimer(self): if self._responseDoCheck: timeout = self._responseCheckTimeout cb = self._CheckResponse timerId = self._evhandler.AddTimerEvent(timeout, cb) self._checkResponseTimerId = timerId def _StopResponseTimer(self): self._responseDoCheck = 0 def GotResonse(self): """To be called whenever data has been received from the other end. No arguments. Returns: void Throws: nothing """ self._timeForLastResponse = time.time() def _CheckResponse(self): if self._responseDoCheck: now = time.time() if now > self._timeForLastResponse + self._noResponseTimeout: self._responseDoCheck = 0 self._noResponseCb() else: self._StartResponseTimer() def _InitStartTickTimer(self, netTickTime, timeToTickCb): self._timeToTickCb = timeToTickCb self._tickTimeout = netTickTime * 0.25 self._tickCheckTimeout = netTickTime * 0.125 self._tickDoCheck = 1 self._timeForLastTick = time.time() self._StartTickTimer() def _StartTickTimer(self): if self._tickDoCheck: timeout = self._tickCheckTimeout cb = self._Tick timerId = self._evhandler.AddTimerEvent(timeout, cb) self._tickTimerId = timerId def _StopTickTimer(self): self._tickDoCheck = 0 def RestartTick(self): """To be called whenever something has been sent to the other end. No arguments. Returns: void Throws: nothing """ self._timeForLastTick = time.time() def _Tick(self): if self._tickDoCheck: self._StartTickTimer() now = time.time() if now > self._timeForLastTick + self._tickTimeout: self._timeToTickCb() self._timeForLastTick = time.time() def Stop(self): """Stop the timers. No arguments. Returns: void Throws: nothing """ self._StopResponseTimer() self._StopTickTimer() class ErlNodeOutConnection(erl_async_conn.ErlAsyncClientConnection): """This class handles a connection _to_ another node, initiated by this node. Inheritance: erl_async_conn.ErlAsyncClientConnection This is intended to be used by the erl_node.ErlNode class. """ _STATE_DISCONNECTED = -1 _STATE_HANDSHAKE_RECV_STATUS = 2 _STATE_HANDSHAKE_RECV_CHALLENGE = 4 _STATE_HANDSHAKE_RECV_CHALLENGE_ACK = 6 _STATE_CONNECTED = 7 def __init__(self, nodeName, opts): """Constructor. NODE-NAME = string OPTS = Throws: nothing """ erl_async_conn.ErlAsyncClientConnection.__init__(self) self._recvdata = "" self._hostName = None self._portNum = None self._nodeName = nodeName self._opts = opts self._peerName = None self._state = self._STATE_DISCONNECTED # 2 bytes for the packet length during the handshake, then 4 bytes self._packetLenSize = 2 # These are started once the connection is up self._tickTimers = None def InitiateConnection(self, hostName, portNum, connectOkCb, connectFailedCb, connectionBrokenCb, passThroughMsgCb): """Initiates a connection to another erlang-node. HOST-NAME = string The node to connect to. PORT-NUM = integer The port on that node. Use the EPMD to find the port number, given a node name. CONNECT-OK-CB = To be called when a connection has been successfully established. CONNECT-FAILED-CB = CONNECTION = ErlNodeOutConnection PEER-NAME = string (the node name for the peer) To be called when a connection establishment failed. CONNECTION-BROKEN-CB = CONNECTION = ErlNodeOutConnection PEER-NAME = string (the node name for the peer) To be called when an established connection has been broken. Returns: void Throws: <> """ self._hostName = hostName self._portNum = portNum self._connectOkCb = connectOkCb self._connectFailedCb = connectFailedCb self._connectionBrokenCb = connectionBrokenCb self._passThroughMsgCb = passThroughMsgCb self._peerName = "(unknown)@%s" % hostName if self.Connect(hostName, portNum): self._SendName() self._state = self._STATE_HANDSHAKE_RECV_STATUS else: return 0 def GetPeerNodeName(self): """Retrieves the node name for the peer. No arguments. Returns: string | "" Throws: nothing """ return self._peerName def SendMsg(self, ctrlMsg, msg=None): """Sends a message to the other end. CTRL-MSG = term MSG (optional) = None | term Returns: void Throws: nothing For information on the CTRL-MSG and the MSG, please refer to the file erl_ext_dist.txt in the Erlang distribution. """ if msg == None: packet = "p" + erl_term.TermToBinary(ctrlMsg) else: packet = "p" + (erl_term.TermToBinary(ctrlMsg) + \ erl_term.TermToBinary(msg)) self._SendPacket(packet) ## ## Internal routines ## def _In(self): """Callback routine, which is called when data is available on the connection.""" connection = self.GetConnection() newData = connection.recv(100000) if len(newData) == 0: self.Close() if self._state != self._STATE_CONNECTED: self._state = self._STATE_DISCONNECTED self._connectFailedCb(self, self.GetPeerNodeName()) else: self._state = self._STATE_DISCONNECTED if self._tickTimers != None: self._tickTimers.Stop() self._connectionBrokenCb(self, "") return self._recvdata = self._recvdata + newData remainingUnhandledData = self._HandleData(self._recvdata) self._recvdata = remainingUnhandledData def _HandleData(self, data): remainingInput = data while 1: if len(remainingInput) < self._packetLenSize: return remainingInput if self._packetLenSize == 2: packetLen = self.ReadInt2(remainingInput[0:2]) packetOffset = 2 else: packetLen = self.ReadInt4(remainingInput[0:4]) packetOffset = 4 if len(remainingInput) < self._packetLenSize + packetLen: return remainingInput packetData = remainingInput[packetOffset:packetOffset+packetLen] self._HandlePacket(packetData) remainingInput = remainingInput[packetOffset+packetLen:] def _HandlePacket(self, data): if self._state == self._STATE_HANDSHAKE_RECV_STATUS: # First check that the correct message came in if data[0] != "s": erl_common.DebugHex(M, "handshake:recv_status: got", data) self.Close() self._state = self._STATE_DISCONNECTED self._connectFailedCb() status = data[1:] if status == "ok" or status == "ok_simultaneous": self._state = self._STATE_HANDSHAKE_RECV_CHALLENGE elif status == "nok" or status == "not_allowed": self.Close() self._state = self._STATE_DISCONNECTED self._connectFailedCb() elif status == "alive": self._SendStatusAliveTrue() self._state = self._STATE_HANDSHAKE_RECV_CHALLENGE else: erl_common.DebugHex(M, "handshake:recv_status", data) elif self._state == self._STATE_HANDSHAKE_RECV_CHALLENGE: # First check that the correct message came in if data[0] != "n": erl_common.DebugHex(M, "handshake:recv_cha", data) self.Close() self._state = self._STATE_DISCONNECTED self._connectFailedCb() self._peerVersion = self.ReadInt2(data[1:3]) self._peerFlags = self.ReadInt4(data[3:7]) challenge = self.ReadInt4(data[7:11]) self._peerName = data[11:] self._SendChallengeReply(challenge) self._state = self._STATE_HANDSHAKE_RECV_CHALLENGE_ACK elif self._state == self._STATE_HANDSHAKE_RECV_CHALLENGE_ACK: # First check that the correct message came in if data[0] != "a": erl_common.DebugHex(M, "handshake:recv_cha_ack", data) self.Close() self._state = self._STATE_DISCONNECTED self._connectFailedCb() digest = data[1:] ownCookie = self._opts.GetCookie() if CheckDigest(digest, self._challengeToPeer, ownCookie): self._packetLenSize = 4 self._state = self._STATE_CONNECTED t = self._opts.GetNetTickTime() self._tickTimers = Ticker(t, self._Tick, self._NoResponse) self._connectOkCb() else: erl_common.Debug(M, "Connection attempt to disallowed node %s" % self._peerName) self.Close() self._state = self._STATE_DISCONNECTED self._connectFailedCb() elif self._state == self._STATE_CONNECTED: self._tickTimers.GotResonse() if len(data) == 0: # A tick return msgType = data[0] if msgType == "p": terms = erl_term.BinariesToTerms(data[1:]) if len(terms) == 2: controlMsg = terms[0] msg = terms[1] self._passThroughMsgCb(self, self.GetPeerNodeName(), controlMsg, msg) elif len(terms) == 1: controlMsg = terms[0] self._passThroughMsgCb(self, self.GetPeerNodeName(), controlMsg, msg) else: debugTxt = "PassThrough-msg: terms=%s" % `terms` erl_common.DebugHex(M, debugTxt, data) else: erl_common.DebugHex(M, "msgType=%c" % msgType, data) else: erl_common.DebugHex(M, "state=%d" % self._state, data) def _Tick(self): """This callback is called by the Ticker class instance when it is time to send a tick to the other end, to indicate that we are still alive. """ self._SendPacket("") def _NoResponse(self): """This callback is called by the Ticker class instance when nothing has been received from the other end for too long. """ erl_common.Debug(M, "InConnection: Connection broken") self._state = self._STATE_DISCONNECTED self._tickTimers.Stop() self._connectionBrokenCb(self, self.GetPeerNodeName()) def _SendName(self): packet = "n" + \ self.PackInt2(self._opts.GetDistrVersion()) + \ self.PackInt4(self._opts.GetDistrFlags()) + \ self._nodeName self._SendHandshakeMsg(packet) def _SendStatusAliveTrue(self): self._SendHandshakeMsg("true") def _SendChallengeReply(self, challenge): digest = GenDigest(challenge, self._opts.GetCookie()) challengeToPeer = GenChallenge() self._challengeToPeer = challengeToPeer packet = "r" + self.PackInt4(challengeToPeer) + digest self._SendHandshakeMsg(packet) def _SendHandshakeMsg(self, packet): msg = self.PackInt2(len(packet)) + packet erl_common.Debug(M, "Sending handshake") self.Send(msg) def _SendPacket(self, packet): msg = self.PackInt4(len(packet)) + packet erl_common.Debug(M, "Sending msg") self._tickTimers.RestartTick() self.Send(msg) class ErlNodeServerSocket(erl_async_conn.ErlAsyncServer): """This class opens a socket and for incoming connections from other Erlang nodes. When a remote node connects, an new instance of the ErlNodeInConnection is created for handling the new connection. This class is indended to be used by the erl_node.ErlNode. """ def __init__(self, nodeName, opts): """Constructor NODE-NAME = string The name of this node OPTS = Throws: nothing """ erl_async_conn.ErlAsyncServer.__init__(self) self._nodeName = nodeName self._opts = opts self._passThroughMsgCb = self._Sink self._nodeUpCb = self._Sink self._nodeDownCb = self._Sink def Start(self, nodeUpCb, nodeDownCb, passThroughMsgCb): """Setup and start to listen for incoming connections. NODE-UP-CB = Callback to call when a new connection has been established. NODE-DOWN-CB = Callback to call when an established connection has been broken. PASS-THROUGH-MSG-CB = Callback to call for pass-through messages. Currently, all messages incoming messages are of this type. (Sub)Types: CONNECTION = The instance of the class that handles the connection PEER-NAME = string The node name for the peer node CTRL-MSG = term MSG = term For information on CTRL-MSG and MSG, see the file erl_ext_dist.txt, which is included in the Erlang distribution. Returns: void Throws: <> """ self._nodeUpCb = nodeUpCb self._nodeDownCb = nodeDownCb self._passThroughMsgCb = passThroughMsgCb return erl_async_conn.ErlAsyncServer.Start(self) def _NewConnection(self, s, remoteAddr): erl_common.Debug(M, "new connection from %s" % `remoteAddr`) inConn = ErlNodeInConnection(s, self._nodeName, self._opts, self._nodeUpCb, self._nodeDownCb, self._passThroughMsgCb) def _Sink(self, *a, **kw): pass class ErlNodeInConnection(erl_async_conn.ErlAsyncPeerConnection): """This class handles incoming connections from other Erlang nodes. This class is indended to be used by the ErlNodeSocketServer (and thus indirectly by the erl_node.ErlNode). """ ## XXX TODO: This node duplicates too much functionality ## from ErlNodeOutConnection, still there are differences. ## ## The differences are in the setting up of the connection; ## during the handshake sequence, the connecting side ## (ErlNodeOutConnection) acts the client, while the connected ## side (ErlNodeInConnection) acts as server. ## ## One idea is to maybe separate the state-machine ## into its own class. ## ## Need to think about this one... _STATE_DISCONNECTED = -1 _STATE_HANDSHAKE_RECV_NAME = 1 _STATE_HANDSHAKE_RECV_STATUS = 3 _STATE_HANDSHAKE_RECV_CHALLENGE_REPLY = 5 _STATE_CONNECTED = 7 def __init__(self, sock, nodeName, opts, newConnectionUpCb, connectionBrokenCb, passThroughMsgCb): """Constructor. SOCK = The socket for the incoming connection. NODE-NAME = string The node name of the node to which this connection belongs. OPTS = Options for the node NEW-CONNECTION-UP-CB = Callback to call when a new connection has been established. CONNECTION-BROKEN-CB = Callback to call when an established connection has been broken. PASS-THROUGH-MSG-CB = Callback to call for pass-through messages. Currently, all messages incoming messages are of this type. """ erl_async_conn.ErlAsyncPeerConnection.__init__(self, sock) self._recvdata = "" self._hostName = None self._portNum = None self._nodeName = nodeName self._opts = opts self._newConnectionUpCb = newConnectionUpCb self._connectionBrokenCb = connectionBrokenCb self._passThroughMsgCb = passThroughMsgCb self._state = self._STATE_HANDSHAKE_RECV_NAME self._peerName = nodeName # 2 bytes for the packet length during the handshake, then 4 bytes self._packetLenSize = 2 # These are started once the connection is up self._tickTimers = None def GetPeerNodeName(self): """Retrieves the node name for the peer. No arguments. Returns: string | "" Throws: nothing """ return self._peerName def SendMsg(self, ctrlMsg, msg=None): """Sends a message to the other end. CTRL-MSG = term MSG (optional) = None | term Returns: void Throws: nothing For information on the CTRL-MSG and the MSG, please refer to the file erl_ext_dist.txt in the Erlang distribution. """ if msg == None: packet = "p" + erl_term.TermToBinary(ctrlMsg) else: packet = "p" + (erl_term.TermToBinary(ctrlMsg) + \ erl_term.TermToBinary(msg)) self._SendPacket(packet) ## ## Internal routines ## def _In(self): """Callback routine, which is called when data is available on the connection.""" connection = self.GetConnection() newData = connection.recv(100000) if len(newData) == 0: self.Close() if self._state != self._STATE_CONNECTED: self._state = self._STATE_DISCONNECTED else: erl_common.Debug(M, "InConnection: Connection broken") self._state = self._STATE_DISCONNECTED if self._tickTimers != None: self._tickTimers.Stop() self._connectionBrokenCb(self, self.GetPeerNodeName()) return self._recvdata = self._recvdata + newData remainingUnhandledData = self._HandleData(self._recvdata) self._recvdata = remainingUnhandledData def _HandleData(self, data): remainingInput = data while 1: if len(remainingInput) < self._packetLenSize: return remainingInput if self._packetLenSize == 2: packetLen = self.ReadInt2(remainingInput[0:2]) packetOffset = 2 else: packetLen = self.ReadInt4(remainingInput[0:4]) packetOffset = 4 if len(remainingInput) < self._packetLenSize + packetLen: return remainingInput packetData = remainingInput[packetOffset:packetOffset+packetLen] self._HandlePacket(packetData) remainingInput = remainingInput[packetOffset+packetLen:] def _HandlePacket(self, data): if self._state == self._STATE_HANDSHAKE_RECV_NAME: # First check that the correct message came in if data[0] != "n": erl_common.DebugHex(M, "handshake:recv_name", data) self.Close() self._state = self._STATE_DISCONNECTED self._peerDistrVersion = self.ReadInt2(data[1:3]) self._peerFlags = self.ReadInt4(data[3:7]) self._peerName = data[7:] # FIXME: check for connections _to_ this node: # check whether nodeName > ownNodeName (or check < ?) self._SendStatusOk() self._SendChallenge() self._state = self._STATE_HANDSHAKE_RECV_CHALLENGE_REPLY elif self._state == self._STATE_HANDSHAKE_RECV_CHALLENGE_REPLY: # First check that the correct message came in if data[0] != "r": erl_common.DebugHex(M, "handshake:recv_chreply", data) self.Close() self._state = self._STATE_DISCONNECTED peersChallenge = self.ReadInt4(data[1:5]) peersDigest = data[5:] ownCookie = self._opts.GetCookie() if CheckDigest(peersDigest, self._challengeToPeer, ownCookie): self._SendChallengeAck(peersChallenge) self._packetLenSIze = 4 self._state = self._STATE_CONNECTED t = self._opts.GetNetTickTime() self._tickTimers = Ticker(t, self._Tick, self._NoResponse) self._newConnectionUpCb(self, self.GetPeerNodeName()) else: erl_common.Debug(M, "Connection attempt from disallowed node %s" % self._peerName) self.Close() self._state = self._STATE_DISCONNECTED elif self._state == self._STATE_CONNECTED: self._tickTimers.GotResonse() if len(data) == 0: # A tick return msgType = data[0] if msgType == "p": terms = erl_term.BinariesToTerms(data[1:]) if len(terms) == 2: controlMsg = terms[0] msg = terms[1] peerName = self.GetPeerNodeName() self._passThroughMsgCb(self, peerName, controlMsg, msg) elif len(terms) == 1: controlMsg = terms[0] peerName = self.GetPeerNodeName() self._passThroughMsgCb(self, peerName, controlMsg) else: debugTxt = "PassThrough-msg: terms=%s" % `terms` erl_common.DebugHex(M, debugTxt, data) else: erl_common.DebugHex(M, "msgType=%c" % msgType, data) else: erl_common.DebugHex(M, "state=%d" % self._state, data) def _Tick(self): """This callback is called by the Ticker class instance when it is time to send a tick to the other end, to indicate that we are still alive. """ self._SendPacket("") def _NoResponse(self): """This callback is called by the Ticker class instance when nothing has been received from the other end for too long. """ erl_common.Debug(M, "InConnection: Connection broken") self._state = self._STATE_DISCONNECTED self._tickTimers.Stop() self._connectionBrokenCb(self, self.GetPeerNodeName()) def _SendStatusOk(self): self._SendHandshakeMsg("sok") def _SendChallenge(self): challenge = GenChallenge() self._challengeToPeer = challenge packet = "n" + \ self.PackInt2(self._opts.GetDistrVersion()) + \ self.PackInt4(self._opts.GetDistrFlags()) + \ self.PackInt4(challenge) + \ self._nodeName self._SendHandshakeMsg(packet) def _SendChallengeAck(self, challenge): packet = "a" + GenDigest(challenge, self._opts.GetCookie()) self._SendHandshakeMsg(packet) def _SendHandshakeMsg(self, packet): msg = self.PackInt2(len(packet)) + packet erl_common.Debug(M, "Sending handshake") self.Send(msg) def _SendPacket(self, packet): msg = self.PackInt4(len(packet)) + packet erl_common.Debug(M, "Sending msg") self._tickTimers.RestartTick() self.Send(msg)