### py_interface -- A Python-implementation of an Erlang node
###
### $Id: erl_node.py,v 1.19 2004/07/12 22:06:18 tab Exp $
###
### Copyright (C) 2002  Tomas Abrahamsson
###
### Author: Tomas Abrahamsson <tab@lysator.liu.se>
### 
### This file is part of the Py-Interface library
###
### This library is free software; you can redistribute it and/or
### modify it under the terms of the GNU Library General Public
### License as published by the Free Software Foundation; either
### version 2 of the License, or (at your option) any later version.
### 
### This library is distributed in the hope that it will be useful,
### but WITHOUT ANY WARRANTY; without even the implied warranty of
### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
### Library General Public License for more details.
### 
### You should have received a copy of the GNU Library General Public
### License along with this library; if not, write to the Free
### Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

### erl_node.py -- the node

import sys
import types
import string


import erl_epmd
import erl_term
import erl_opts
import erl_common
import erl_node_conn
import erl_eventhandler



M = "erl_node"

py_interface_version = "0.9"

_pidCount = 1
_serial = 0

class ErlMBox:
    """This class provides an mbox, which is equivalent to an
    erlang-process. It is intenteded to be used in a single-threaded
    application. You must provide a callback routine for incoming
    messages."""

    def __init__(self, node, pid, msgCallback):
        """The recommended way to create mboxes is through the method
        CreateMBox in the ErlNode class.

        NODE            = <instance of ErlNode>
                        The node to which this mbox belongs.
        PID             = <instance of ErlPid>
                        The pid for this mbox
        MSG-CALLBACK    = <function(MSG): void>
                        A callback to call for incoming messages
                        to this mbox. The callback should take one
                        argument: the message. Its return value is
                        ignored.
        """
        self._node = node
        if msgCallback == None:
            msgCallback = self._Sink
        self._msgCallback = msgCallback
        self._pid = pid
        self._pendingRPCs = {}

    def Self(self):
        """Return the pid for this mbox.

        Returns: <instance of ErlPid>
        Throws:  nothing
        """
        return self._pid

    def Send(self, dest, msg):
        """Send, to DEST, the message MSG.

        DEST    = <instance of ErlPid> | string | <instance of ErlAtom> |
                  tuple(NODE, PROC-NAME)
                  NODE = PROC-NAME = string | <instance of ErlAtom>
                The destination to which the message is to be sent.
                This can is either a pid for an mbox (on the same node or on a
                different node), or a registered name (as string or atom) on
                the same node, or a tuple specifying a node and a registered
                name on that node.

        MSG     = <term>
                The message to be sent.

        Returns: void
        Throws:  <<to-be-documented>>

        Send a message. There is no result indicating whether the delivery
        of the message was successful or not. If the DEST is located on
        another node, a connection to that node is automatically set up
        and maintained, unless already connected.

        This method returns immediately. Any necessarry queuing,
        possibly due to network traffic congestion, is handled
        automatically by the node.
        """
        self._node.SendMsgFromMBox(self, dest, msg)

    def RegisterName(self, name):
        """Register a NAME for this mbox.

        NAME    = string | <instance of ErlAtom>
                The name to register this mbox as.

        Returns: void
        Throws:  <<to-be-documented>>

        Only on name can be registered for each mbox.
        """
        self._node.RegisterNameForMBox(self, name)

    def UnregisterName(self):
        """Unregister the name that was registered for this mbox.
        Returns: void
        Throws:  <<to-be-documented>>
        """
        self._node.UnregisterNameForMBox(self)

    def Link(self, otherEnd):
        """Link this mbox to another pid/mbox.
        Currently, this is not implemented."""
        pass

    def Unlink(self, otherEnd):
        """Unlink another pid/mbox.
        Currently, this is not implemented."""
        pass

    def SendRPC(self, remoteNode, mod, fun, args, cb):
        """Send an rpc to REMOTE-NODE, MOD, FUN, ARGS. Call CB for the answer.

        REMOTE-NODE     = string | <instance of ErlAtom>
                        The node to send the call to
        MOD             =  string | <instance of ErlAtom>
                        The module
        FUN             = string | <instance of ErlAtom>
                        The name of the function to call
        ARGS            = list(<term>)
                        The argument list
        CB              = <function(RESULT): void>
                          REMOTE-NODE = string
                          RESULT = <term>
                        A callback function to be called when the answer
                        to the rpc callback receives. The callback is called
                        with one arg: the result. Its return value is ignored.
                        If the remote node goes down during execution, the
                        callback is called with this value:
                          tuple(<ErlAtom("EXIT")>,
                                tuple(<ErlAtom("nodedown")>,
                                      <ErlAtom(<REMOTE-NODE-NAME>)>))

        Returns: void
        Throws:  <<to-be-documented>>

        Send an request to execute a function in a module on a remote node.
        As for the Send method, this method returns immediately.
        """
        rexMBox = self._node.WhereisMBox("rex")
        rexMBox.SendRPC(remoteNode, mod, fun, args, cb)

    ##
    ## Routines to be called from the node only
    ##
    def Msg(self, sourceNodeName, msg):
        """This routine is intended to be called only from mbox's node.

        SOURCE-NODE-NAME        = string
        MSG                     = <term>

        Returns: void
        Throws:  nothing

        An incoming message
        """
        self._msgCallback(msg)

    def _Sink(self, *a, **kw):
        """Message sink."""
        pass


class _ErlRexMBox(ErlMBox):
    """This class implements what's needed for the rpc (remote procedure call)
    functionality. It is an mbox that registers itself as "rex".
    """

    def __init__(self, node, pid):
        ErlMBox.__init__(self, node, pid, None)
        self._nodeDownSubscriptions = {}

    def Start(self):
        self.RegisterName("rex")


    def SendRPC(self, remoteNode, mod, fun, args, cb):
        """Send an rpc to REMOTE-NODE, MOD, FUN, ARGS. Call CB for the answer.

        REMOTE-NODE     = string | <instance of ErlAtom>
                        The node to send the call to
        MOD             =  string | <instance of ErlAtom>
                        The module
        FUN             = string | <instance of ErlAtom>
                        The name of the function to call
        ARGS            = list(<term>)
                        The argument list
        CB              = <function(RESULT): void>
                          REMOTE-NODE = string
                          RESULT = <term>
                        A callback function to be called when the answer
                        to the rpc callback receives. The callback is called
                        with one arg: the result. Its return value is ignored.
                        If the remote node goes down during execution, the
                        callback is called with this value:
                          tuple(<ErlAtom("EXIT")>,
                                tuple(<ErlAtom("nodedown")>,
                                      <ErlAtom(<REMOTE-NODE-NAME>)>))

        Returns: void
        Throws:  <<to-be-documented>>

        Send an request to execute a function in a module on a remote node.
        As for the Send method, this method returns immediately.
        """
        if type(mod) == types.StringType:
            mod = erl_term.ErlAtom(mod)
        if type(fun) == types.StringType:
            fun = erl_term.ErlAtom(fun)

        # Handle queue of pending callbacks for the remote node
        if type(remoteNode) == types.StringType:
            remoteNodeName = remoteNode
        elif erl_term.IsErlAtom(remoteNode):
            remoteNodeName = remoteNode.atomText
        if self._pendingRPCs.has_key(remoteNodeName):
            self._pendingRPCs[remoteNodeName].append(cb)
        else:
            self._pendingRPCs[remoteNodeName] = [cb]

        # Register a nodedown-callback for this node, so
        # we can call any rpc callbacks in case the node goes down
        if not self._nodeDownSubscriptions.has_key(remoteNodeName):
            id = self._node.NodeDownSubscribe(self._NodeDown)
            self._nodeDownSubscriptions[remoteNodeName] = id

        # Now send the rpc-message
        self.Send(("rex", remoteNode),
                  (self.Self(),
                   (erl_term.ErlAtom("call"),
                    mod, fun, args, erl_term.ErlAtom("user"))))


    ##
    ## Routines to be called from the node only
    ##
    def Msg(self, sourceNodeName, msg):
        """This routine is intended to be called only from mbox's node.

        SOURCE-NODE-NAME        = string
        MSG                     = <term>

        Returns: void
        Throws:  nothing

        An incoming message
        """
        if type(msg) == types.TupleType and \
           len(msg) == 2 and \
           erl_term.IsErlAtom(msg[0]) and \
           msg[0].atomText == "rex" and \
           len(self._pendingRPCs) > 0:
            self._RPCAnswer(sourceNodeName, msg[1])
        else:
            erl_common.Debug("REX: Unexpected msg: %s" % `msg`)

    def _RPCAnswer(self, sourceNodeName, answer):
        # Does this assumption always hold:
        # first answer is for first rpc-call?
        # Maybe this holds for calls/answers for one single node.
        # So the pendingRPCs is one queue per node.
        if self._pendingRPCs.has_key(sourceNodeName):
            pendingRPCs = self._pendingRPCs[sourceNodeName]
            cb = pendingRPCs[0]
            self._pendingRPCs[sourceNodeName] = pendingRPCs[1:]
            cb(answer)

    def _NodeDown(self, nodeStatus, nodeName):
        if self._pendingRPCs.has_key(nodeName):
            callbacks = self._pendingRPCs[nodeName]
            self._pendingRPCs[nodeName] = []
            for cb in callbacks:
                cb((erl_term.ErlAtom("EXIT"),
                    (erl_term.ErlAtom("nodedown"),
                     erl_term.ErlAtom(nodeName))))


class ErlNode:
    """This class implements a node, which is equivaluent to an erlang node.
    It is intended to be used in a single-threaded applucation.

    MBoxes, which corresponds to erlang processes, can be created for
    communication.
    """
    # Early operations 
    CTRLMSGOP_LINK = 1
    CTRLMSGOP_SEND = 2
    CTRLMSGOP_EXIT = 3
    CTRLMSGOP_UNLINK = 4
    CTRLMSGOP_NODE_LINK = 5
    CTRLMSGOP_REG_SEND = 6
    CTRLMSGOP_GROUP_LEADER = 7
    CTRLMSGOP_EXIT2 = 8
    # New operations in destrvsn = 1 (OTP R4)
    CTRLMSGOP_SEND_TT = 12
    CTRLMSGOP_EXIT_TT = 13
    CTRLMSGOP_REG_SEND_TT = 16
    CTRLMSGOP_EXIT2_TT = 18
    # New operations in destrvsn = 4 (OTP R6)
    CTRLMSGOP_MONITOR_P = 19
    CTRLMSGOP_DEMONITOR_P = 20
    CTRLMSGOP_MONITOR_P_EXIT = 21
    # end of operations
    
    def __init__(self, nodeName, opts=erl_opts.ErlNodeOpts()):
        """Constructor.

        NODE-NAME       = string
                        The name for this node.

        OPTS            = <instance of ErlNodeOpts>

        Creates an ErlNode. The name of the node is determined by NODE-NAME
        as described below. A node-name consists of two parts: an alive-name
        and a host-name, separated by an `@'. The host-name can be short
        (not including the domain) or long (including the domain). Short and
        long node-names must not be mixed among the nodes in a system, see
        the erlang documentation for further details.
          1. If the NODE-NAME contains an `@', then NODE-NAME is used
             unchanged as name for the node.
          2. If the NODE-NAME does not contain an `@', then the node's name
             is constructed as NODE-NAME + "@" + host-name, where
             host-name is either on short or long form, depending on what is
             specified in the OPTS.
        """
        shortNodeNames = opts.GetShortNodeNames()
        self._nodeName = erl_common.AlignNodeName(nodeName, shortNodeNames)
        self._opts = opts

        self._creation = 0
        self._connections = {}

        self._epmd = erl_epmd.ErlEpmd()
        self._ongoingPings = {}

        self._isServerPublished = 0
        self._pids = {}                 # mapping pid     --> ErlMBox()
        self._mboxes = {}               # mapping ErlMBox --> pid
        self._registeredNames = {}      # mapping name    --> pid
        self._registeredPids = {}       # mapping pid     --> name

        self._nodeUpCb = []             # stores (id, callback)
        self._nodeDownCb = []           # stores (id, callback)
        self._cbId = 0

        self._server = erl_node_conn.ErlNodeServerSocket(self._nodeName,
                                                         self._opts)
        self._portNum = self._server.Start(self._NodeUp, self._NodeDown,
                                           self._PassThroughMsg)
        self._epmd.SetOwnPortNum(self._portNum)
        self._epmd.SetOwnNodeName(self._nodeName)
        self._CreateRex()
        
    def CreateMBox(self, msgCallback=None):
        """Creates an mbox, which is equivalent to an erlang process.

        MSG-CALLBACK    = <function(MSG): void>
                        A callback function to call for incoming messages.

        Returns: <instance of ErlMBox>
        Throws:  nothing

        This creates an mbox, which is equivalent to an erlang process.
        Messages to the mbox are delivered as callbacks to the callback
        function.
        """
        mboxPid = self._CreatePid()
        mbox = ErlMBox(self, mboxPid, msgCallback)
        self._pids[mboxPid] = mbox
        self._mboxes[mbox] = mboxPid
        return mbox

    def Ping(self, remoteNodeName, pingCallback):
        """Ping a remote node.

        REMOTE-NODE-NAME        = string | <instance of ErlAtom>
                                The node to ping
        PING-CALLBACK           = <function(RESULT): void>
                                A callback to call for deliverance of the
                                ping result. RESULT = "pong" | "pang".

        Returns: void
        Throws:  <<to-be-documented>>

        Try to ping a remote node. A connection to that node is established
        unless already connected. Whether or not the remote node is up or
        down is indicated as by the argument to the callback function:
          "pong": the remote node is alive and there is a connection to it
          "pang": the remote node is down.
        """
        if not "@" in remoteNodeName:
            raise "Bad node name for remote node \"%s\"" % remoteNodeName
        if self._ongoingPings.has_key(remoteNodeName):
            pingCallbacks = self._ongoingPings[remoteNodeName]
            self._ongoingPings[remoteNodeName] = pingCallbacks + [pingCallback]
        else:
            self._ongoingPings[remoteNodeName] = [pingCallback]
            [nodeName, hostName] = string.split(remoteNodeName, "@")
            e = erl_epmd.ErlEpmd(hostName)
            cb = erl_common.Callback(self._PingEpmdResponse, remoteNodeName)
            e.PortPlease2Req(nodeName, cb)

    def Publish(self):
        """Publish this node to the EPMD, the Erlang Portmapper Daemon.
        Returns: void
        Throws:  nothing.

        Publishing must be done for mboxes/processes on other nodes
        to be able to establish contact with mboxes on this node.
        The node is published automatically when it tries to contact
        another node, for example due to a message to send or an rpc call.
        """
        if not self._isServerPublished:
            self._epmd.Connect(self._EpmdConnectedOk, self._EpmdConnectFailed)

    def Unpublish(self):
        """Removes the publication of this node to the EPMD.
        Returns: void
        Throws:  nothing
        """
        if self._isServerPublished:
            self._epmd.Close()
            self._isServerPublished = 0

    def NodeUpSubscribe(self, cb):
        """Subscribe to nodeup-information

        CB        = <function("nodeup", NODE-NAME): void>

        Returns:  ID = <id>
        Throws:   nothing

        Subscribe to nodeup-information, that is, to connections to
        new nodes. The new connections may initiated by, for example,
        a messages begin sent from this node to another one (a new
        connection is automatically set up), or by another node
        connection to this.

        The ID returned can be used to unsubscribe the callback from further
        nodeup-informations.
        """
        id = self._cbId
        self._cbId = self._cbId + 1
        self._nodeUpCb.append((id, cb))
        return id

    def NodeUpUnsubscribe(self, id):
        """Unsubscribe to nodeup-information

        ID              = <id>

        Returns: void
        Throws:  nothing

        Unsubscribes to nodeup-information.
        """
        cbs = self._nodeUpCb
        for (index, (cbid, cb)) in map(None, range(len(cbs)), cbs):
            if id == cbid:
                del self._nodeUpCb[index]
                return

    def NodeDownSubscribe(self, cb):
        """Subscribe to nodedown-information for a certain node

        CB        = <function("nodedown", NODE-NAME): void>

        Returns:  ID = <id>
        Throws:   nothing

        Subscribe to nodedown-information, that is, to broken connections to
        other nodes.

        The ID returned can be used to unsubscribe the callback from further
        nodedown-informations.
        """
        id = self._cbId
        self._cbId = self._cbId + 1
        self._nodeDownCb.append((id, cb))
        return id

    def NodeDownUnsubscribe(self, id):
        """Unsubscribe to nodedown-information

        ID              = <id>

        Returns: void
        Throws:  nothing

        Unsubscribes to nodedown-information.
        """
        cbs = self._nodeDownCb
        for (index, (cbid, cb)) in map(None, range(len(cbs)), cbs):
            if id == cbid:
                del self._nodeDownCb[index]
                return

    def DumpConnections(self):
        """Dump connections. This method is intended for debugging purposes.
        Returns: void
        Throws:  nothing
        """
        print "Connections:"
        for k in self._connections.keys():
            print "  %s --> %s" % (`k`, `self._connections[k]`)
        print "--"


    ##
    ## Routines to be called only by mboxes
    ##

    def RegisterNameForMBox(self, mbox, name):
        """This routine is intended to be called from an ErlMBox instance.
        MBOX = <instance of ErlMBox>
        NAME = string
        Returns: void
        Throws:  <<IsRegistered>>
        """
        mboxPid = mbox.Self()
        if self._registeredNames.has_key(name):
            raise "IsRegistered"
        if self._registeredPids.has_key(mboxPid):
            raise "IsRegistered"
        self._registeredNames[name] = mboxPid
        self._registeredPids[mboxPid] = name

    def UnregisterNameForMBox(self, mbox):
        """This routine is intended to be called from an ErlMBox instance
        MBOX = <instance of ErlMBox>
        Returns: void
        Throws:  <<NotRegistered>>
        """
        mboxPid = mbox.Self()
        if not self._registeredPids.has_key(mboxPid):
            raise "NotRegistered"
        name = self._registeredPids[mboxPid]
        del self._registeredPids[mboxPid]
        del self._registeredNames[name]


    def WhereisMBox(self, name):
        """Lookup an mbox that is registered under a name.
        NAME = string
        Returns: <instance of ErlMBox>
        Throws:  nothing
        """
        mboxPid = self.WhereisPid(name)
        if mboxPid == None:
            return None
        if not self._pids.has_key(mboxPid):
            return None
        return self._pids[mboxPid]

    def WhereisPid(self, name):
        """Lookup an mbox that is registered under a name.
        NAME = string
        Returns: <instance of ErlPid>
        Throws:  nothing
        """
        if not self._registeredNames.has_key(name):
            return None
        return self._registeredNames[name]


    def SendMsgFromMBox(self, sourceMBox, dest, msg):
        """This routine is intended to be called from an ErlMBox instance
        SOURCE-MBOX = <instance of ErlMBox>
        DEST        = <instance of ErlPid> |
                      string |
                      <instance of ErlAtom> |
                      tuple(DEST-NODE, DEST-REGNAME)
                      DEST-NODE = DEST-REGNAME = string | <instance of ErlAtom>
        MSG         = <term>
        Returns: void
        THrows:  <<to-be-documented>>
        """
        ## Possible dest types:
        ## - A tuple: (registered_name, node_name)
        ## - An atom: registered_name
        ## - A pid:   <erlpid ...>  (note: the pid contains the pid's node)

        sourcePid = self._mboxes[sourceMBox]

        ## First check for strings in the dest argument.
        ## Convert any strings to to atoms.
        if type(dest) == types.StringType:
            dest = erl_term.ErlAtom(dest)
        elif type(dest) == types.TupleType:
            destPidName = dest[0]
            destNode = dest[1]
            if type(destPidName) == types.StringType:
                destPidName = erl_term.ErlAtom(destPidName)
            if type(destNode) == types.StringType:
                destNode = erl_term.ErlAtom(destNode)
            dest = (destPidName, destNode)

        ## Then split the dest into:
        ##    destPid/destPidName   and   destNode
        ## depending on its type.
        if type(dest) == types.TupleType:
            destPid = dest[0]
            destNode = dest[1]
        elif erl_term.IsErlAtom(dest):
            destNode = self
            name = dest.atomText
            if not self._registeredNames.has_key(name):
                return
            destPid = self._registeredNames[name]
        elif erl_term.IsErlPid(dest):
            destPid = dest
            destNode = dest.node
        else:
            return

        ## Now do the sending...
        if destNode == self:
            if not self._registeredPids.has_key(destPid):
                return
            mbox = self._registredPids[destPid]
            mbox.Msg(msg)
        else:                           # dest node is remote
            # First make sure we are online
            # FIXME: Will this really work???
            #        Publish might register callbacks, but
            #        this code continues after the Publish...
            self.Publish()
            destNodeName = destNode.atomText
            if not self._connections.has_key(destNodeName):
                # We don't have a connection to the destination
                # We must open a connection.
                # This is done by pinging with the ping-callback
                # being a function that sends the message.

                cb = erl_common.Callback(self._SendMsgToRemoteNode,
                                         sourcePid, destNode, destPid, msg)
                destNodeName = destNode.atomText
                self.Ping(destNodeName, cb)
            else:
                ## We have contact with the remote node. Send at once!
                self._SendMsgToRemoteNode("pong",
                                          sourcePid, destNode, destPid, msg)

    ##
    ## Internal routines
    ##

    def _CreateRex(self):
        mboxPid = self._CreatePid()
        mbox = _ErlRexMBox(self, mboxPid)
        self._pids[mboxPid] = mbox
        self._mboxes[mbox] = mboxPid
        mbox.Start()

    def _CreatePid(self):
        """Returns: <instance of ErlPid>"""
        ## Code stolen from com/ericsson/otp/erlang/OtpLocalNode.java
        global _serial, _pidCount
        newPid = erl_term.ErlPid(erl_term.ErlAtom(self._nodeName),
                                 _pidCount, _serial, self._creation)
        _pidCount = _pidCount + 1
        if _pidCount > 0x7fff:
            _pidCount = 0
            _serial = _serial + 1
            if _serial > 0x07:
                _serial = 0
        return newPid

    def _EpmdConnectedOk(self, creation):
        """This callback is called when the publish to EPMD has successfully
        completed."""
        self._isServerPublished = 1
        self._creation = creation

    def _EpmdConnectFailed(self, errorResult):
        raise "Failed to connect to epmd (%d)" % errorResult

    def _NodeUp(self, connection, nodeName):
        """This callback is called from the in/out connection object
        when a new connection has been established."""
        erl_common.Debug(M, "NODEUP: nodeName=%s connection=%s" % \
                         (nodeName, connection))
        self._connections[nodeName] = connection
        for (id, cb) in self._nodeUpCb:
            cb("nodeup", nodeName)

    def _NodeDown(self, connection, nodeName):
        """This callback is called from the in/out connection object when a
        connection has been broken."""
        erl_common.Debug(M, "NODENOWN: nodeName=%s connection=%s" % \
                         (nodeName, connection))
        if self._connections.has_key(nodeName):
            del self._connections[nodeName]
            for (id, cb) in self._nodeDownCb:
                cb("nodedown", nodeName)
    
    def _PingEpmdResponse(self, result, portNum, nodeType, proto,
                          distVSNRange, nodeNameNoHost, extra,
                          remoteNodeName):
        """This callback is called when the lookup for a nodename at a host
        has returned, whether it was successful or not.
        If it was successful, then RESULT != 0 and the other parameters
        are valid. If it was not successful, then the other parameters have
        undefined values."""
        if result != 0:
            callbacks = self._ongoingPings[remoteNodeName]
            del self._ongoingPings[remoteNodeName]
            for cb in callbacks:
                cb("pang")
        else:
            [otherNode, otherHost] = string.split(remoteNodeName, "@")
            out = erl_node_conn.ErlNodeOutConnection(self._nodeName,
                                                     self._opts)
            connectedOkCb = erl_common.Callback(self._PingSucceeded,
                                                out, remoteNodeName)
            connectFailedCb = erl_common.Callback(self._PingFailed,
                                                  out, remoteNodeName)
            connectionBrokenCb = erl_common.Callback(self._NodeDown,
                                                     out, remoteNodeName)
            passThroughMsgCb = erl_common.Callback(self._PassThroughMsg,
                                                   out, remoteNodeName)
            out.InitiateConnection(otherHost, portNum,
                                   connectedOkCb,
                                   connectFailedCb,
                                   connectionBrokenCb,
                                   self._PassThroughMsg)

    def _PingSucceeded(self, connection, remoteNodeName):
        """This internal routine signals that the ping of another node
        was successul."""
        callbacks = self._ongoingPings[remoteNodeName]
        del self._ongoingPings[remoteNodeName]
        self._NodeUp(connection, remoteNodeName)
        for cb in callbacks:
            cb("pong")

    def _PingFailed(self, connection, remoteNodeName):
        """This internal routine signals that the ping of another node
        failed."""
        callbacks = self._ongoingPings[remoteNodeName]
        del self._ongoingPings[remoteNodeName]
        for cb in callbacks:
            cb("pang")
        

    def _PassThroughMsg(self, connection, remoteNodeName, ctrlMsg, msg=None):
        """This callback is called when a connection recevies a message of
        type passthrough. Currently all messages are of type passthrough."""
        erl_common.Debug(M, "ctrlMsg=%s" % `ctrlMsg`)

        ctrlMsgOp = ctrlMsg[0]
        if ctrlMsgOp == self.CTRLMSGOP_LINK:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_SEND:
            cookie = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            msg = msg
            erl_common.Debug(M, "SEND: msg=%s" % `msg`)
            if self._pids.has_key(toPid):
                mbox = self._pids[toPid]
                mbox.Msg(remoteNodeName, msg)
            else:
                erl_common.Debug(M, "Got SEND with no dest pid: %s" % toPid)
                erl_common.Debug(M, "Pids:\n%s" % `self._pids`)
        elif ctrlMsgOp == self.CTRLMSGOP_EXIT:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            reason = ctrlMsg[3]
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_UNLINK:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_NODE_LINK:
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_REG_SEND:
            fromPid = ctrlMsg[1]
            cookie = ctrlMsg[2]
            toNameAtom = ctrlMsg[3]
            toName = toNameAtom.atomText
            msg = msg
            if self._registeredNames.has_key(toName):
                mboxPid = self._registeredNames[toName]
                mbox = self._pids[mboxPid]
                mbox.Msg(remoteNodeName, msg)
            else:
                erl_common.Debug(M,
                                 "Got REG_SEND with no dest mbox: \"%s\": %s" %
                                 (toName, msg))
        elif ctrlMsgOp == self.CTRLMSGOP_GROUP_LEADER:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_EXIT2:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            reason = ctrlMsg[3]
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_SEND_TT:
            cookie = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            traceToken = ctrlMsg[3]
            msg = msg
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_EXIT_TT:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            traceToken = ctrlMsg[3]
            reason = ctrlMsg[4]
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_REG_SEND_TT:
            fromPid = ctrlMsg[1]
            cookie = ctrlMsg[2]
            toName = ctrlMsg[3]
            traceToken = ctrlMsg[4]
            msg = msg
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_EXIT2_TT:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            traceToken = ctrlMsg[3]
            reason = ctrlMsg[4]
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_MONITOR_P:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            ref = ctrlMsg[3]
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_DEMONITOR_P:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            ref = ctrlMsg[3]
            pass
        elif ctrlMsgOp == self.CTRLMSGOP_MONITOR_P_EXIT:
            fromPid = ctrlMsg[1]
            toPid = self._InternPid(ctrlMsg[2])
            ref = ctrlMsg[3]
            pass
        else:
            erl_common.Debug(M, "Unknown controlmsg: %s" % `ctrlMsg`)

    def _SendMsgToRemoteNode(self, pingResult, srcPid, destNode, destPid, msg):
        """This internal routine performs the actual sending."""
        if pingResult != "pong":
            return
        destNodeName = destNode.atomText
        if not self._connections.has_key(destNodeName):
            return
        conn = self._connections[destNodeName]
        cookie = erl_term.ErlAtom("")
        if erl_term.IsErlAtom(destPid):
            ctrlMsg = (self.CTRLMSGOP_REG_SEND, srcPid, cookie, destPid)
        else:
            ctrlMsg = (self.CTRLMSGOP_SEND, cookie, destPid)
        conn.SendMsg(ctrlMsg, msg)

    def _InternPid(self, newPid):
        """This is like intern() for strings, but for pids.
        The purpose is so that we'll be able to lookup pids
        in self._pids and self._registredPids.
        """
        for existingPid in self._pids.keys():
            if existingPid.equals(newPid):
                return existingPid
        return newPid


syntax highlighted by Code2HTML, v. 0.9.1