### py_interface -- A Python-implementation of an Erlang node
###
### $Id: erl_node_conn.py,v 1.16 2002/05/28 22:09:25 tab Exp $
###
### Copyright (C) 2002 Tomas Abrahamsson
###
### Author: Tomas Abrahamsson <tab@lysator.liu.se>
###
### This file is part of the Py-Interface library
###
### This library is free software; you can redistribute it and/or
### modify it under the terms of the GNU Library General Public
### License as published by the Free Software Foundation; either
### version 2 of the License, or (at your option) any later version.
###
### This library is distributed in the hope that it will be useful,
### but WITHOUT ANY WARRANTY; without even the implied warranty of
### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
### Library General Public License for more details.
###
### You should have received a copy of the GNU Library General Public
### License along with this library; if not, write to the Free
### Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
### erl_node_conn.py -- Handle inter-node communication
import sys
import time
import types
import string
import socket
import random
import md5
import erl_opts
import erl_term
import erl_common
import erl_async_conn
import erl_eventhandler
M = "erl_node_conn"
def CheckDigest(digest, challenge, cookie):
"""Checks that a digest is correct.
DIGEST = string
CHALLENGE = integer | longinteger
COOKIE = string
Returns: 1 | 0
Throws: nothing
"""
expectedDigest = GenDigest(challenge, cookie)
return expectedDigest == digest
def GenDigest(challenge, cookie):
"""Generates a digest from a CHALLENGE and a COOKIE.
CHALLENGE = integer | longinteger
COOKIE = string
Returns: string
Throws: nothing
"""
challengeStr = str(challenge)
if challengeStr[-1] == 'L':
challengeStr = challengeStr[:-1]
return md5.new(cookie + challengeStr).digest()
def GenChallenge():
"""Generates a challenge.
No arguments.
Returns: integer
Throws: nothing
"""
return int(random.random() * 0x7fffffff)
class Ticker:
"""This class is used for keeping track of the net-ticks:
* when a remote node has been silent for too long
* when it's time to send a tick so the other part won't think
we've been silent for to long (to `tick').
"""
def __init__(self, netTickTime, timeToTickCb, noResponseCb):
"""Constructor
NET-TICK-TIME = integer
the net-tick-time (in seconds).
TIME-TO-TICK-CB = <function(): void>
callback to call when it's time to tick.
NO-RESPONSE-CB = <function(): void>
callback to call when the other end has been
silent for too long.
Throws: nothing
"""
self._netTickTime = netTickTime
self._evhandler = erl_eventhandler.GetEventHandler()
self._InitStartResponseTimer(netTickTime, noResponseCb)
self._InitStartTickTimer(netTickTime, timeToTickCb)
def _InitStartResponseTimer(self, netTickTime, noResponseCb):
self._noResponseCb = noResponseCb
self._noResponseTimeout = netTickTime * 1.25
self._responseCheckTimeout = netTickTime * 0.25
self._responseDoCheck = 1
self._timeForLastResponse = time.time()
self._StartResponseTimer()
def _StartResponseTimer(self):
if self._responseDoCheck:
timeout = self._responseCheckTimeout
cb = self._CheckResponse
timerId = self._evhandler.AddTimerEvent(timeout, cb)
self._checkResponseTimerId = timerId
def _StopResponseTimer(self):
self._responseDoCheck = 0
def GotResonse(self):
"""To be called whenever data has been received from the other end.
No arguments.
Returns: void
Throws: nothing
"""
self._timeForLastResponse = time.time()
def _CheckResponse(self):
if self._responseDoCheck:
now = time.time()
if now > self._timeForLastResponse + self._noResponseTimeout:
self._responseDoCheck = 0
self._noResponseCb()
else:
self._StartResponseTimer()
def _InitStartTickTimer(self, netTickTime, timeToTickCb):
self._timeToTickCb = timeToTickCb
self._tickTimeout = netTickTime * 0.25
self._tickCheckTimeout = netTickTime * 0.125
self._tickDoCheck = 1
self._timeForLastTick = time.time()
self._StartTickTimer()
def _StartTickTimer(self):
if self._tickDoCheck:
timeout = self._tickCheckTimeout
cb = self._Tick
timerId = self._evhandler.AddTimerEvent(timeout, cb)
self._tickTimerId = timerId
def _StopTickTimer(self):
self._tickDoCheck = 0
def RestartTick(self):
"""To be called whenever something has been sent to the other end.
No arguments.
Returns: void
Throws: nothing
"""
self._timeForLastTick = time.time()
def _Tick(self):
if self._tickDoCheck:
self._StartTickTimer()
now = time.time()
if now > self._timeForLastTick + self._tickTimeout:
self._timeToTickCb()
self._timeForLastTick = time.time()
def Stop(self):
"""Stop the timers.
No arguments.
Returns: void
Throws: nothing
"""
self._StopResponseTimer()
self._StopTickTimer()
class ErlNodeOutConnection(erl_async_conn.ErlAsyncClientConnection):
"""This class handles a connection _to_ another node,
initiated by this node.
Inheritance: erl_async_conn.ErlAsyncClientConnection
This is intended to be used by the erl_node.ErlNode class.
"""
_STATE_DISCONNECTED = -1
_STATE_HANDSHAKE_RECV_STATUS = 2
_STATE_HANDSHAKE_RECV_CHALLENGE = 4
_STATE_HANDSHAKE_RECV_CHALLENGE_ACK = 6
_STATE_CONNECTED = 7
def __init__(self, nodeName, opts):
"""Constructor.
NODE-NAME = string
OPTS = <instance of erl_opts.ErlNodeOpts>
Throws: nothing
"""
erl_async_conn.ErlAsyncClientConnection.__init__(self)
self._recvdata = ""
self._hostName = None
self._portNum = None
self._nodeName = nodeName
self._opts = opts
self._peerName = None
self._state = self._STATE_DISCONNECTED
# 2 bytes for the packet length during the handshake, then 4 bytes
self._packetLenSize = 2
# These are started once the connection is up
self._tickTimers = None
def InitiateConnection(self, hostName, portNum,
connectOkCb, connectFailedCb, connectionBrokenCb,
passThroughMsgCb):
"""Initiates a connection to another erlang-node.
HOST-NAME = string
The node to connect to.
PORT-NUM = integer
The port on that node. Use the EPMD to find
the port number, given a node name.
CONNECT-OK-CB = <function(): void>
To be called when a connection has been
successfully established.
CONNECT-FAILED-CB = <function(CONNECTION, PEER-NAME): void>
CONNECTION = ErlNodeOutConnection
PEER-NAME = string (the node name for the peer)
To be called when a connection establishment
failed.
CONNECTION-BROKEN-CB = <function(CONNECTION, PEER-NAME): void>
CONNECTION = ErlNodeOutConnection
PEER-NAME = string (the node name for the peer)
To be called when an established connection
has been broken.
Returns: void
Throws: <<to-be-documented>>
"""
self._hostName = hostName
self._portNum = portNum
self._connectOkCb = connectOkCb
self._connectFailedCb = connectFailedCb
self._connectionBrokenCb = connectionBrokenCb
self._passThroughMsgCb = passThroughMsgCb
self._peerName = "(unknown)@%s" % hostName
if self.Connect(hostName, portNum):
self._SendName()
self._state = self._STATE_HANDSHAKE_RECV_STATUS
else:
return 0
def GetPeerNodeName(self):
"""Retrieves the node name for the peer.
No arguments.
Returns: string | ""
Throws: nothing
"""
return self._peerName
def SendMsg(self, ctrlMsg, msg=None):
"""Sends a message to the other end.
CTRL-MSG = term
MSG (optional) = None | term
Returns: void
Throws: nothing
For information on the CTRL-MSG and the MSG, please refer to the file
erl_ext_dist.txt in the Erlang distribution.
"""
if msg == None:
packet = "p" + erl_term.TermToBinary(ctrlMsg)
else:
packet = "p" + (erl_term.TermToBinary(ctrlMsg) + \
erl_term.TermToBinary(msg))
self._SendPacket(packet)
##
## Internal routines
##
def _In(self):
"""Callback routine, which is called when data is available
on the connection."""
connection = self.GetConnection()
newData = connection.recv(100000)
if len(newData) == 0:
self.Close()
if self._state != self._STATE_CONNECTED:
self._state = self._STATE_DISCONNECTED
self._connectFailedCb(self, self.GetPeerNodeName())
else:
self._state = self._STATE_DISCONNECTED
if self._tickTimers != None:
self._tickTimers.Stop()
self._connectionBrokenCb(self, "")
return
self._recvdata = self._recvdata + newData
remainingUnhandledData = self._HandleData(self._recvdata)
self._recvdata = remainingUnhandledData
def _HandleData(self, data):
remainingInput = data
while 1:
if len(remainingInput) < self._packetLenSize:
return remainingInput
if self._packetLenSize == 2:
packetLen = self.ReadInt2(remainingInput[0:2])
packetOffset = 2
else:
packetLen = self.ReadInt4(remainingInput[0:4])
packetOffset = 4
if len(remainingInput) < self._packetLenSize + packetLen:
return remainingInput
packetData = remainingInput[packetOffset:packetOffset+packetLen]
self._HandlePacket(packetData)
remainingInput = remainingInput[packetOffset+packetLen:]
def _HandlePacket(self, data):
if self._state == self._STATE_HANDSHAKE_RECV_STATUS:
# First check that the correct message came in
if data[0] != "s":
erl_common.DebugHex(M, "handshake:recv_status: got", data)
self.Close()
self._state = self._STATE_DISCONNECTED
self._connectFailedCb()
status = data[1:]
if status == "ok" or status == "ok_simultaneous":
self._state = self._STATE_HANDSHAKE_RECV_CHALLENGE
elif status == "nok" or status == "not_allowed":
self.Close()
self._state = self._STATE_DISCONNECTED
self._connectFailedCb()
elif status == "alive":
self._SendStatusAliveTrue()
self._state = self._STATE_HANDSHAKE_RECV_CHALLENGE
else:
erl_common.DebugHex(M, "handshake:recv_status", data)
elif self._state == self._STATE_HANDSHAKE_RECV_CHALLENGE:
# First check that the correct message came in
if data[0] != "n":
erl_common.DebugHex(M, "handshake:recv_cha", data)
self.Close()
self._state = self._STATE_DISCONNECTED
self._connectFailedCb()
self._peerVersion = self.ReadInt2(data[1:3])
self._peerFlags = self.ReadInt4(data[3:7])
challenge = self.ReadInt4(data[7:11])
self._peerName = data[11:]
self._SendChallengeReply(challenge)
self._state = self._STATE_HANDSHAKE_RECV_CHALLENGE_ACK
elif self._state == self._STATE_HANDSHAKE_RECV_CHALLENGE_ACK:
# First check that the correct message came in
if data[0] != "a":
erl_common.DebugHex(M, "handshake:recv_cha_ack", data)
self.Close()
self._state = self._STATE_DISCONNECTED
self._connectFailedCb()
digest = data[1:]
ownCookie = self._opts.GetCookie()
if CheckDigest(digest, self._challengeToPeer, ownCookie):
self._packetLenSize = 4
self._state = self._STATE_CONNECTED
t = self._opts.GetNetTickTime()
self._tickTimers = Ticker(t, self._Tick, self._NoResponse)
self._connectOkCb()
else:
erl_common.Debug(M,
"Connection attempt to disallowed node %s" %
self._peerName)
self.Close()
self._state = self._STATE_DISCONNECTED
self._connectFailedCb()
elif self._state == self._STATE_CONNECTED:
self._tickTimers.GotResonse()
if len(data) == 0:
# A tick
return
msgType = data[0]
if msgType == "p":
terms = erl_term.BinariesToTerms(data[1:])
if len(terms) == 2:
controlMsg = terms[0]
msg = terms[1]
self._passThroughMsgCb(self, self.GetPeerNodeName(),
controlMsg, msg)
elif len(terms) == 1:
controlMsg = terms[0]
self._passThroughMsgCb(self, self.GetPeerNodeName(),
controlMsg, msg)
else:
debugTxt = "PassThrough-msg: terms=%s" % `terms`
erl_common.DebugHex(M, debugTxt, data)
else:
erl_common.DebugHex(M, "msgType=%c" % msgType, data)
else:
erl_common.DebugHex(M, "state=%d" % self._state, data)
def _Tick(self):
"""This callback is called by the Ticker class instance
when it is time to send a tick to the other end, to indicate that
we are still alive.
"""
self._SendPacket("")
def _NoResponse(self):
"""This callback is called by the Ticker class instance
when nothing has been received from the other end for too long.
"""
erl_common.Debug(M, "InConnection: Connection broken")
self._state = self._STATE_DISCONNECTED
self._tickTimers.Stop()
self._connectionBrokenCb(self, self.GetPeerNodeName())
def _SendName(self):
packet = "n" + \
self.PackInt2(self._opts.GetDistrVersion()) + \
self.PackInt4(self._opts.GetDistrFlags()) + \
self._nodeName
self._SendHandshakeMsg(packet)
def _SendStatusAliveTrue(self):
self._SendHandshakeMsg("true")
def _SendChallengeReply(self, challenge):
digest = GenDigest(challenge, self._opts.GetCookie())
challengeToPeer = GenChallenge()
self._challengeToPeer = challengeToPeer
packet = "r" + self.PackInt4(challengeToPeer) + digest
self._SendHandshakeMsg(packet)
def _SendHandshakeMsg(self, packet):
msg = self.PackInt2(len(packet)) + packet
erl_common.Debug(M, "Sending handshake")
self.Send(msg)
def _SendPacket(self, packet):
msg = self.PackInt4(len(packet)) + packet
erl_common.Debug(M, "Sending msg")
self._tickTimers.RestartTick()
self.Send(msg)
class ErlNodeServerSocket(erl_async_conn.ErlAsyncServer):
"""This class opens a socket and for incoming connections from other
Erlang nodes. When a remote node connects, an new instance of
the ErlNodeInConnection is created for handling the new connection.
This class is indended to be used by the erl_node.ErlNode.
"""
def __init__(self, nodeName, opts):
"""Constructor
NODE-NAME = string
The name of this node
OPTS = <instance of erl_opts.ErlNodeOpts>
Throws: nothing
"""
erl_async_conn.ErlAsyncServer.__init__(self)
self._nodeName = nodeName
self._opts = opts
self._passThroughMsgCb = self._Sink
self._nodeUpCb = self._Sink
self._nodeDownCb = self._Sink
def Start(self, nodeUpCb, nodeDownCb, passThroughMsgCb):
"""Setup and start to listen for incoming connections.
NODE-UP-CB = <function(CONNECTION, PEER-NAME): void>
Callback to call when a new connection has been
established.
NODE-DOWN-CB = <function(CONNECTION, PEER-NAME): void>
Callback to call when an established connection
has been broken.
PASS-THROUGH-MSG-CB = <function(CONNECTION, PEER-NAME, CTRL-MSG, [MSG])
: void>
Callback to call for pass-through messages.
Currently, all messages incoming messages are
of this type.
(Sub)Types:
CONNECTION = <instance of ErlNodeInConnection>
The instance of the class that handles the connection
PEER-NAME = string
The node name for the peer node
CTRL-MSG = term
MSG = term
For information on CTRL-MSG and MSG, see the
file erl_ext_dist.txt, which is included in the
Erlang distribution.
Returns: void
Throws: <<to-be-documented>>
"""
self._nodeUpCb = nodeUpCb
self._nodeDownCb = nodeDownCb
self._passThroughMsgCb = passThroughMsgCb
return erl_async_conn.ErlAsyncServer.Start(self)
def _NewConnection(self, s, remoteAddr):
erl_common.Debug(M, "new connection from %s" % `remoteAddr`)
inConn = ErlNodeInConnection(s,
self._nodeName, self._opts,
self._nodeUpCb, self._nodeDownCb,
self._passThroughMsgCb)
def _Sink(self, *a, **kw):
pass
class ErlNodeInConnection(erl_async_conn.ErlAsyncPeerConnection):
"""This class handles incoming connections from other Erlang nodes.
This class is indended to be used by the ErlNodeSocketServer
(and thus indirectly by the erl_node.ErlNode).
"""
## XXX TODO: This node duplicates too much functionality
## from ErlNodeOutConnection, still there are differences.
##
## The differences are in the setting up of the connection;
## during the handshake sequence, the connecting side
## (ErlNodeOutConnection) acts the client, while the connected
## side (ErlNodeInConnection) acts as server.
##
## One idea is to maybe separate the state-machine
## into its own class.
##
## Need to think about this one...
_STATE_DISCONNECTED = -1
_STATE_HANDSHAKE_RECV_NAME = 1
_STATE_HANDSHAKE_RECV_STATUS = 3
_STATE_HANDSHAKE_RECV_CHALLENGE_REPLY = 5
_STATE_CONNECTED = 7
def __init__(self, sock, nodeName, opts,
newConnectionUpCb, connectionBrokenCb,
passThroughMsgCb):
"""Constructor.
SOCK = <socket>
The socket for the incoming connection.
NODE-NAME = string
The node name of the node to which this
connection belongs.
OPTS = <instance of erl_opts.ErlNodeOpts>
Options for the node
NEW-CONNECTION-UP-CB = <function(CONNECTION, PEER-NAME): void>
Callback to call when a new connection has been
established.
CONNECTION-BROKEN-CB = <function(CONNECTION, PEER-NAME): void>
Callback to call when an established connection
has been broken.
PASS-THROUGH-MSG-CB = <function(CONNECTION, PEER-NAME,
CTRL-MSG, [MSG]): void>
Callback to call for pass-through messages.
Currently, all messages incoming messages are
of this type.
"""
erl_async_conn.ErlAsyncPeerConnection.__init__(self, sock)
self._recvdata = ""
self._hostName = None
self._portNum = None
self._nodeName = nodeName
self._opts = opts
self._newConnectionUpCb = newConnectionUpCb
self._connectionBrokenCb = connectionBrokenCb
self._passThroughMsgCb = passThroughMsgCb
self._state = self._STATE_HANDSHAKE_RECV_NAME
self._peerName = nodeName
# 2 bytes for the packet length during the handshake, then 4 bytes
self._packetLenSize = 2
# These are started once the connection is up
self._tickTimers = None
def GetPeerNodeName(self):
"""Retrieves the node name for the peer.
No arguments.
Returns: string | ""
Throws: nothing
"""
return self._peerName
def SendMsg(self, ctrlMsg, msg=None):
"""Sends a message to the other end.
CTRL-MSG = term
MSG (optional) = None | term
Returns: void
Throws: nothing
For information on the CTRL-MSG and the MSG, please refer to the file
erl_ext_dist.txt in the Erlang distribution.
"""
if msg == None:
packet = "p" + erl_term.TermToBinary(ctrlMsg)
else:
packet = "p" + (erl_term.TermToBinary(ctrlMsg) + \
erl_term.TermToBinary(msg))
self._SendPacket(packet)
##
## Internal routines
##
def _In(self):
"""Callback routine, which is called when data is available
on the connection."""
connection = self.GetConnection()
newData = connection.recv(100000)
if len(newData) == 0:
self.Close()
if self._state != self._STATE_CONNECTED:
self._state = self._STATE_DISCONNECTED
else:
erl_common.Debug(M, "InConnection: Connection broken")
self._state = self._STATE_DISCONNECTED
if self._tickTimers != None:
self._tickTimers.Stop()
self._connectionBrokenCb(self, self.GetPeerNodeName())
return
self._recvdata = self._recvdata + newData
remainingUnhandledData = self._HandleData(self._recvdata)
self._recvdata = remainingUnhandledData
def _HandleData(self, data):
remainingInput = data
while 1:
if len(remainingInput) < self._packetLenSize:
return remainingInput
if self._packetLenSize == 2:
packetLen = self.ReadInt2(remainingInput[0:2])
packetOffset = 2
else:
packetLen = self.ReadInt4(remainingInput[0:4])
packetOffset = 4
if len(remainingInput) < self._packetLenSize + packetLen:
return remainingInput
packetData = remainingInput[packetOffset:packetOffset+packetLen]
self._HandlePacket(packetData)
remainingInput = remainingInput[packetOffset+packetLen:]
def _HandlePacket(self, data):
if self._state == self._STATE_HANDSHAKE_RECV_NAME:
# First check that the correct message came in
if data[0] != "n":
erl_common.DebugHex(M, "handshake:recv_name", data)
self.Close()
self._state = self._STATE_DISCONNECTED
self._peerDistrVersion = self.ReadInt2(data[1:3])
self._peerFlags = self.ReadInt4(data[3:7])
self._peerName = data[7:]
# FIXME: check for connections _to_ this node:
# check whether nodeName > ownNodeName (or check < ?)
self._SendStatusOk()
self._SendChallenge()
self._state = self._STATE_HANDSHAKE_RECV_CHALLENGE_REPLY
elif self._state == self._STATE_HANDSHAKE_RECV_CHALLENGE_REPLY:
# First check that the correct message came in
if data[0] != "r":
erl_common.DebugHex(M, "handshake:recv_chreply", data)
self.Close()
self._state = self._STATE_DISCONNECTED
peersChallenge = self.ReadInt4(data[1:5])
peersDigest = data[5:]
ownCookie = self._opts.GetCookie()
if CheckDigest(peersDigest, self._challengeToPeer, ownCookie):
self._SendChallengeAck(peersChallenge)
self._packetLenSIze = 4
self._state = self._STATE_CONNECTED
t = self._opts.GetNetTickTime()
self._tickTimers = Ticker(t, self._Tick, self._NoResponse)
self._newConnectionUpCb(self, self.GetPeerNodeName())
else:
erl_common.Debug(M,
"Connection attempt from disallowed node %s" %
self._peerName)
self.Close()
self._state = self._STATE_DISCONNECTED
elif self._state == self._STATE_CONNECTED:
self._tickTimers.GotResonse()
if len(data) == 0:
# A tick
return
msgType = data[0]
if msgType == "p":
terms = erl_term.BinariesToTerms(data[1:])
if len(terms) == 2:
controlMsg = terms[0]
msg = terms[1]
peerName = self.GetPeerNodeName()
self._passThroughMsgCb(self, peerName, controlMsg, msg)
elif len(terms) == 1:
controlMsg = terms[0]
peerName = self.GetPeerNodeName()
self._passThroughMsgCb(self, peerName, controlMsg)
else:
debugTxt = "PassThrough-msg: terms=%s" % `terms`
erl_common.DebugHex(M, debugTxt, data)
else:
erl_common.DebugHex(M, "msgType=%c" % msgType, data)
else:
erl_common.DebugHex(M, "state=%d" % self._state, data)
def _Tick(self):
"""This callback is called by the Ticker class instance
when it is time to send a tick to the other end, to indicate that
we are still alive.
"""
self._SendPacket("")
def _NoResponse(self):
"""This callback is called by the Ticker class instance
when nothing has been received from the other end for too long.
"""
erl_common.Debug(M, "InConnection: Connection broken")
self._state = self._STATE_DISCONNECTED
self._tickTimers.Stop()
self._connectionBrokenCb(self, self.GetPeerNodeName())
def _SendStatusOk(self):
self._SendHandshakeMsg("sok")
def _SendChallenge(self):
challenge = GenChallenge()
self._challengeToPeer = challenge
packet = "n" + \
self.PackInt2(self._opts.GetDistrVersion()) + \
self.PackInt4(self._opts.GetDistrFlags()) + \
self.PackInt4(challenge) + \
self._nodeName
self._SendHandshakeMsg(packet)
def _SendChallengeAck(self, challenge):
packet = "a" + GenDigest(challenge, self._opts.GetCookie())
self._SendHandshakeMsg(packet)
def _SendHandshakeMsg(self, packet):
msg = self.PackInt2(len(packet)) + packet
erl_common.Debug(M, "Sending handshake")
self.Send(msg)
def _SendPacket(self, packet):
msg = self.PackInt4(len(packet)) + packet
erl_common.Debug(M, "Sending msg")
self._tickTimers.RestartTick()
self.Send(msg)
syntax highlighted by Code2HTML, v. 0.9.1