//
// Copyright (C) 2005-2006 SIPez LLC.
// Licensed to SIPfoundry under a Contributor Agreement.
//
// Copyright (C) 2006 SIPfoundry Inc.
// Licensed by SIPfoundry under the LGPL license.
//
// Copyright (C) 2006 Pingtel Corp.
// Licensed to SIPfoundry under a Contributor Agreement.
//
// $$
//////////////////////////////////////////////////////////////////////////////
// SYSTEM INCLUDES
#include <memory>
using std::auto_ptr;
// APPLICATION INCLUDES
#include "sipdb/RegistrationBinding.h"
#include "os/OsDateTime.h"
#include "utl/UtlSListIterator.h"
#include "SipRegistrar.h"
#include "SipRegistrarServer.h"
#include "RegistrarInitialSync.h"
#include "SyncRpc.h"
// DEFINES
// CONSTANTS
// TYPEDEFS
// FORWARD DECLARATIONS
/// Create the startup phase thread.
RegistrarInitialSync::RegistrarInitialSync(SipRegistrar& registrar)
: OsTask("RegistrarInitSync-%d"),
mRegistrar(registrar),
mFinished(OsBSem::Q_PRIORITY, OsBSem::EMPTY)
{
};
int RegistrarInitialSync::run(void* pArg)
{
OsSysLog::add(FAC_SIP, PRI_DEBUG, "RegistrarInitialSync started");
// get the max update number for local updates from the local database
getRegistrarServer().restoreDbUpdateNumber();
// get the received update numbers for each peer from the local database
restorePeerUpdateNumbers();
// having done that, we can begin accepting pull requests from peers
SyncRpcPullUpdates::registerSelf(mRegistrar);
// Get from peers any of our own updates that we have lost
pullLocalUpdatesFromPeers();
// Get from peers any peer updates that we missed or lost while down
pullPeerUpdatesFromPeers();
// Get any updates for unreachable peers from reachable ones
recoverUnReachablePeers();
// Reset the DbUpdateNumber so that the upper half is the epoch time
getRegistrarServer().resetDbUpdateNumberEpoch();
OsSysLog::add(FAC_SIP, PRI_DEBUG, "RegistrarInitialSync complete");
// allow SipRegistrar to proceed to operational phase
mFinished.release();
return 0; // exit thread
}
/// Recover the latest received update number for each peer from the local database
void RegistrarInitialSync::restorePeerUpdateNumbers()
{
auto_ptr<UtlSListIterator> peers(mRegistrar.getPeers());
RegistrarPeer* peer;
while ((peer = static_cast<RegistrarPeer*>((*peers)())))
{
const char* name = peer->name();
assert(name);
// Set the last received update number for the peer to the max update number
// for the peer that we see in the registration DB
INT64 maxUpdateNumber = getRegistrarServer().getMaxUpdateNumberForRegistrar(name);
peer->setReceivedFrom(maxUpdateNumber);
OsSysLog::add(FAC_SIP, PRI_DEBUG,
"RegistrarInitialSync::restorePeerUpdateNumbers "
"for peer '%s' last received update # = %0#16llx",
name, maxUpdateNumber);
}
}
/// Get from peers any of our own updates that we have lost
void RegistrarInitialSync::pullLocalUpdatesFromPeers()
{
auto_ptr<UtlSListIterator> peers(mRegistrar.getPeers());
RegistrarPeer* peer;
while ((peer = static_cast<RegistrarPeer*>((*peers)())))
{
RegistrarPeer::SynchronizationState state = peer->synchronizationState();
// Only RegistrarTest can declare a peer Reachable, via a reset, that happens later
assert(state != RegistrarPeer::Reachable);
if (state == RegistrarPeer::Uninitialized)
{
// Call pullUpdates, passing the local registrar host name and DbUpdateNumber.
// The purpose of this call is to recover any registrations for which the local
// host was the primary but which for some reason were not saved in the local
// persistent store (the canonical case is that the local file was lost or
// corrupted - when this is the case, the local DbUpdateNumber will usually be zero).
// If we can't reach the peer, then the invoke method marks it UnReachable.
// Pulling updates changes maxUpdateNumber, so fetch it on each iteration
const char* primaryName = getPrimaryName();
INT64 maxUpdateNumber = getRegistrarServer().getDbUpdateNumber();
UtlSList bindings;
state = SyncRpcPullUpdates::invoke(
peer, // the peer we're contacting
primaryName, // name of the calling registrar (this one)
primaryName, // name of the registrar whose updates we're pulling (this one)
maxUpdateNumber, // pull all updates more recent than this
&bindings); // return bindings in this list
assert(state != RegistrarPeer::Reachable);
// Apply the resulting updates to the DB, if pullUpdates succeeded.
// A return value of Uninitialized indicates success. If pullUpdates fails
// then the peer's state is downgraded to UnReachable or Incompatible.
if ( state == RegistrarPeer::Uninitialized
&& bindings.entries() > 0)
{
applyUpdatesToDirectory(bindings);
OsSysLog::add(FAC_SIP, PRI_DEBUG,
"RegistrarInitialSync::pullLocalUpdatesFromPeers "
"received %d local updates from peer '%s'",
bindings.entries(), peer->name());
}
else
{
OsSysLog::add(FAC_SIP, PRI_DEBUG,
"RegistrarInitialSync::pullPeerUpdatesFromPeers "
"'%s' is %s",
peer->name(), peer->getStateName());
}
bindings.destroyAll();
}
}
}
/// Get from peers any peer updates that we missed or lost while down
void RegistrarInitialSync::pullPeerUpdatesFromPeers()
{
auto_ptr<UtlSListIterator> peers(mRegistrar.getPeers());
RegistrarPeer* peer;
const char* primaryName = getPrimaryName();
while ((peer = static_cast<RegistrarPeer*>((*peers)())))
{
RegistrarPeer::SynchronizationState state = peer->synchronizationState();
assert(state != RegistrarPeer::Reachable);
if (state == RegistrarPeer::Uninitialized)
{
const char* peerName = peer->name();
assert(peerName);
UtlSList bindings;
state = SyncRpcPullUpdates::invoke(
peer, primaryName, peerName, peer->receivedFrom(), &bindings);
assert(state != RegistrarPeer::Reachable);
// Apply the resulting updates to the DB
if ( state == RegistrarPeer::Uninitialized
&& bindings.entries() > 0
)
{
applyUpdatesToDirectory(bindings);
OsSysLog::add(FAC_SIP, PRI_DEBUG,
"RegistrarInitialSync::pullPeerUpdatesFromPeers "
"received %d peer updates from peer '%s'",
bindings.entries(), peer->name());
}
else
{
OsSysLog::add(FAC_SIP, PRI_DEBUG,
"RegistrarInitialSync::pullPeerUpdatesFromPeers "
"'%s' is %s",
peer->name(), peer->getStateName());
}
}
}
}
/// Get any updates for unreachable peers from reachable ones.
void RegistrarInitialSync::recoverUnReachablePeers()
{
// In the startup phase no peers are formally in the Reachable state, instead we
// look for peers who are reachable but formally in the Uninitialized state.
// :TODO: The state machine is confusing because the Reachable state doesn't
// match the practical notion of reachability. Simplify.
// loop over reachable peers
auto_ptr<UtlSListIterator> peers(mRegistrar.getPeers());
RegistrarPeer* reachablePeer;
const char* primaryName = getPrimaryName();
while ((reachablePeer = static_cast<RegistrarPeer*>((*peers)())))
{
if (reachablePeer->synchronizationState() == RegistrarPeer::Uninitialized)
{
// loop over unreachable peers
auto_ptr<UtlSListIterator> peers2(mRegistrar.getPeers());
RegistrarPeer* unreachablePeer;
while ((unreachablePeer = static_cast<RegistrarPeer*>((*peers)())))
{
if (unreachablePeer->synchronizationState() == RegistrarPeer::UnReachable)
{
UtlSList bindings;
RegistrarPeer::SynchronizationState state =
SyncRpcPullUpdates::invoke(
reachablePeer, // the peer we are calling
primaryName, // the calling registrar (us)
unreachablePeer->name(), // peer whose updates we want
unreachablePeer->receivedFrom(), // want updates above this number
&bindings); // put the bindings here
assert(state != RegistrarPeer::Reachable);
// Apply the resulting updates to the DB
if ( state == RegistrarPeer::Uninitialized
&& bindings.entries() > 0
)
{
applyUpdatesToDirectory(bindings);
OsSysLog::add(FAC_SIP, PRI_DEBUG,
"RegistrarInitialSync::recoverUnReachablePeers "
"received %d peer updates from peer '%s' for peer '%s'",
bindings.entries(),
reachablePeer->name(),
unreachablePeer->name());
}
else
{
OsSysLog::add(FAC_SIP, PRI_DEBUG,
"RegistrarInitialSync::recoverUnReachablePeers "
"'%s' is %s",
reachablePeer->name(), reachablePeer->getStateName());
}
}
}
}
}
}
void RegistrarInitialSync::waitForCompletion()
{
mFinished.acquire();
}
const char* RegistrarInitialSync::getPrimaryName()
{
const char* primaryName = mRegistrar.primaryName();
assert(primaryName);
return primaryName;
}
SipRegistrarServer& RegistrarInitialSync::getRegistrarServer()
{
return mRegistrar.getRegistrarServer();
}
void RegistrarInitialSync::applyUpdatesToDirectory(UtlSList& bindings)
{
int timeNow = OsDateTime::getSecsSinceEpoch();
getRegistrarServer().applyUpdatesToDirectory(timeNow, bindings);
}
/// destructor
RegistrarInitialSync::~RegistrarInitialSync()
{
};
syntax highlighted by Code2HTML, v. 0.9.1