//
// dr.cc : Diffusion Routing Class
// authors : John Heidemann and Fabio Silva
//
// Copyright (C) 2000-2003 by the University of Southern California
// $Id: dr.cc,v 1.17 2005/09/13 04:53:49 tomh Exp $
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License,
// version 2, as published by the Free Software Foundation.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along
// with this program; if not, write to the Free Software Foundation, Inc.,
// 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
//
// Linking this file statically or dynamically with other modules is making
// a combined work based on this file. Thus, the terms and conditions of
// the GNU General Public License cover the whole combination.
//
// In addition, as a special exception, the copyright holders of this file
// give you permission to combine this file with free software programs or
// libraries that are released under the GNU LGPL and with code included in
// the standard release of ns-2 under the Apache 2.0 license or under
// otherwise-compatible licenses with advertising requirements (or modified
// versions of such code, with unchanged license). You may copy and
// distribute such a system following the terms of the GNU GPL for this
// file and the licenses of the other code concerned, provided that you
// include the source code of that other code when and as the GNU GPL
// requires distribution of source code.
//
// Note that people who make modified versions of this file are not
// obligated to grant this special exception for their modified versions;
// it is their choice whether to do so. The GNU General Public License
// gives permission to release a modified version without this exception;
// this exception also makes it possible to release a modified version
// which carries forward this exception.
//
#include <stdlib.h>
#include <stdio.h>
#include "dr.hh"
class CallbackEntry {
public:
NR::Callback *cb_;
NR::handle subscription_handle_;
CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) :
cb_(cb), subscription_handle_(subscription_handle) {};
};
class HandleEntry {
public:
handle hdl_;
bool valid_;
NRAttrVec *attrs_;
NR::Callback *cb_;
struct timeval exploratory_time_;
int32_t subscription_id_; // Used for One-Phase Pull
HandleEntry()
{
GetTime(&exploratory_time_);
valid_ = true;
cb_ = NULL;
};
~HandleEntry(){
ClearAttrs(attrs_);
delete attrs_;
};
};
int InterestCallback::expire()
{
int retval;
// Call the interestTimeout function
retval = drt_->interestTimeout(handle_entry_);
if (retval < 0)
delete this;
return retval;
}
int FilterKeepaliveCallback::expire()
{
int retval;
// Call the filterTimeout function
retval = drt_->filterKeepaliveTimeout(filter_entry_);
if (retval < 0)
delete this;
return retval;
}
int OldAPITimer::expire()
{
int retval;
// Call the callback function with the provided API
retval = cb_->expire(0, p_);
if (retval < 0)
delete this;
return retval;
}
#ifdef NS_DIFFUSION
class DiffEventQueue;
int DiffusionRouting::getNodeId() {
return node_->address();
}
int DiffusionRouting::getAgentId(int id) {
if (id != -1)
agent_id_ = id;
return agent_id_;
}
NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) {
return(new DiffusionRouting(port, da));
}
#else
NR *dr = NULL;
#ifdef USE_THREADS
void * ReceiveThread(void *dr)
{
// Never returns
((DiffusionRouting *)dr)->run(true, WAIT_FOREVER);
return NULL;
}
#endif // USE_THREADS
NR * NR::createNR(u_int16_t port)
{
// Create Diffusion Routing Class
if (dr)
return dr;
dr = new DiffusionRouting(port);
#ifdef USE_THREADS
int retval;
pthread_t thread;
// Fork a thread for receiving Messages
retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);
if (retval){
DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n");
exit(-1);
}
#endif // USE_THREADS
return dr;
}
#endif // NS_DIFFUSION
void GetLock(pthread_mutex_t *mutex)
{
#ifdef USE_THREADS
pthread_mutex_lock(mutex);
#endif // USE_THREADS
}
void ReleaseLock(pthread_mutex_t *mutex)
{
#ifdef USE_THREADS
pthread_mutex_unlock(mutex);
#endif // USE_THREADS
}
#ifdef NS_DIFFUSION
DiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da)
{
#else
DiffusionRouting::DiffusionRouting(u_int16_t port)
{
#ifdef USE_EMSIM
char *sim_id;
char *sim_group;
#endif // USE_EMSIM
#endif // NS_DIFFUSION
struct timeval tv;
DiffusionIO *device;
// Initialize basic stuff
next_handle_ = 1;
GetTime(&tv);
SetSeed(&tv);
pkt_count_ = GetRand();
random_id_ = GetRand();
agent_id_ = 0;
if (port == 0)
port = DEFAULT_DIFFUSION_PORT;
diffusion_port_ = port;
#ifdef USE_EMSIM
// Check if we are running in the emstar simulator
sim_id = getenv("SIM_ID");
sim_group = getenv("SIM_GROUP");
// Update diffusion port if running inside the simulator
if (sim_id && sim_group){
diffusion_port_ = diffusion_port_ + atoi(sim_id) + (100 * atoi(sim_group));
}
#endif // USE_EMSIM
// Initialize timer manager
timers_manager_ = new TimerManager;
// Initialize input device
#ifdef NS_DIFFUSION
device = new NsLocal(da);
local_out_devices_.push_back(device);
#endif // NS_DIFFUSION
#ifdef UDP
device = new UDPLocal(&agent_id_);
in_devices_.push_back(device);
local_out_devices_.push_back(device);
#endif // UDP
// Print initialization message
DiffPrint(DEBUG_ALWAYS,
"Diffusion Routing Agent initializing... Agent Id = %d\n",
agent_id_);
#ifdef USE_THREADS
// Initialize Semaphores
dr_mtx_ = new pthread_mutex_t;
pthread_mutex_init(dr_mtx_, NULL);
#endif // USE_THREADS
}
DiffusionRouting::~DiffusionRouting()
{
HandleList::iterator itr;
HandleEntry *current;
// Delete all Handles
for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){
current = *itr;
delete current;
}
for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){
current = *itr;
delete current;
}
}
handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb)
{
NRSimpleAttribute<int> *nr_algorithm = NULL;
TimerCallback *timer_callback;
NRAttribute *scope_attr;
HandleEntry *my_handle;
// Get lock first
GetLock(dr_mtx_);
// Check the published attributes
if (!checkSubscription(subscribe_attrs)){
DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n");
ReleaseLock(dr_mtx_);
return FAIL;
}
// Create and Initialize the handle_entry structute
my_handle = new HandleEntry;
my_handle->hdl_ = next_handle_;
next_handle_++;
my_handle->cb_ = (NR::Callback *) cb;
sub_list_.push_back(my_handle);
// Copy the attributes
my_handle->attrs_ = CopyAttrs(subscribe_attrs);
// For subscriptions, scope is global if not specified
if (!hasScope(subscribe_attrs)){
scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);
my_handle->attrs_->push_back(scope_attr);
}
// For One-Phase Pull, we need a subscription id
nr_algorithm = NRAlgorithmAttr.find(subscribe_attrs);
if (nr_algorithm &&
nr_algorithm->getVal() == NRAttribute::ONE_PHASE_PULL_ALGORITHM){
my_handle->subscription_id_ = GetRand();
my_handle->attrs_->push_back(NRSubscriptionAttr.make(NRAttribute::IS,
my_handle->subscription_id_));
}
// Create Interest Timer and add it to the queue
timer_callback = new InterestCallback(this, my_handle);
timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);
// Release lock
ReleaseLock(dr_mtx_);
return my_handle->hdl_;
}
int DiffusionRouting::unsubscribe(handle subscription_handle)
{
HandleEntry *my_handle = NULL;
// Get the lock first
GetLock(dr_mtx_);
my_handle = findHandle(subscription_handle, &sub_list_);
if (!my_handle){
// Handle doesn't exist, return FAIL
ReleaseLock(dr_mtx_);
return FAIL;
}
// Handle will be destroyed when next interest timeout happens
my_handle->valid_ = false;
// Release the lock
ReleaseLock(dr_mtx_);
return OK;
}
handle DiffusionRouting::publish(NRAttrVec *publish_attrs)
{
HandleEntry *my_handle;
NRAttribute *scope_attr;
// Get the lock first
GetLock(dr_mtx_);
// Check the published attributes
if (!checkPublication(publish_attrs)){
DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n");
ReleaseLock(dr_mtx_);
return FAIL;
}
// Create and Initialize the handle_entry structute
my_handle = new HandleEntry;
my_handle->hdl_ = next_handle_;
next_handle_++;
pub_list_.push_back(my_handle);
// Copy the attributes
my_handle->attrs_ = CopyAttrs(publish_attrs);
// For publications, scope is local if not specified
if (!hasScope(publish_attrs)){
scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);
my_handle->attrs_->push_back(scope_attr);
}
// Release the lock
ReleaseLock(dr_mtx_);
return my_handle->hdl_;
}
int DiffusionRouting::unpublish(handle publication_handle)
{
HandleEntry *my_handle = NULL;
// Get the lock first
GetLock(dr_mtx_);
my_handle = removeHandle(publication_handle, &pub_list_);
if (!my_handle){
// Handle doesn't exist, return FAIL
ReleaseLock(dr_mtx_);
return FAIL;
}
// Free structures
delete my_handle;
// Release the lock
ReleaseLock(dr_mtx_);
return OK;
}
int DiffusionRouting::send(handle publication_handle,
NRAttrVec *send_attrs)
{
NRSimpleAttribute<int> *nr_algorithm = NULL;
NRSimpleAttribute<int> *rmst_id_attr = NULL;
int8_t send_message_type = DATA;
struct timeval current_time;
HandleEntry *my_handle;
Message *my_message;
// Get the lock first
GetLock(dr_mtx_);
// Get attributes associated with handle
my_handle = findHandle(publication_handle, &pub_list_);
if (!my_handle){
ReleaseLock(dr_mtx_);
return FAIL;
}
// Check the send attributes
if (!checkSend(send_attrs)){
DiffPrint(DEBUG_ALWAYS,
"Error : Invalid class/scope attributes in send attributes !\n");
ReleaseLock(dr_mtx_);
return FAIL;
}
// Check if it is time to send another exploratory data message
GetTime(¤t_time);
// Check algorithms
nr_algorithm = NRAlgorithmAttr.find(my_handle->attrs_);
rmst_id_attr = RmstIdAttr.find(send_attrs);
if (!nr_algorithm && !rmst_id_attr || nr_algorithm &&
nr_algorithm->getVal() != NRAttribute::ONE_PHASE_PULL_ALGORITHM){
// In One-Phase Pull, there are no exploratory messages
if (TimevalCmp(¤t_time, &(my_handle->exploratory_time_)) >= 0){
// Check if it is a push data message or a regular data message
if (isPushData(my_handle->attrs_)){
// Push data message
// Update time for the next push exploratory message
GetTime(&(my_handle->exploratory_time_));
my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;
send_message_type = PUSH_EXPLORATORY_DATA;
}
else{
// Regular data message
// Update time for the next exploratory message
GetTime(&(my_handle->exploratory_time_));
my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;
send_message_type = EXPLORATORY_DATA;
}
}
}
// Initialize message structure
my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,
0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,
LOCALHOST_ADDR);
// Increment pkt_counter
pkt_count_++;
// First, we duplicate the 'publish' attributes
my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);
// Now, we add the send attributes
AddAttrs(my_message->msg_attr_vec_, send_attrs);
// Compute the total number and size of the joined attribute sets
my_message->num_attr_ = my_message->msg_attr_vec_->size();
my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);
// Release the lock
ReleaseLock(dr_mtx_);
// Send Packet
sendMessageToDiffusion(my_message);
delete my_message;
return OK;
}
int DiffusionRouting::sendRmst(handle publication_handle,
NRAttrVec *send_attrs, int fragment_size)
{
NRSimpleAttribute<void *> *rmst_data_attr;
NRSimpleAttribute<int> *frag_number_attr;
NRSimpleAttribute<int> *max_frag_attr;
void *frag_ptr, *blob_ptr;
char *blob;
timeval send_interval;
int retval;
int id = GetRand() % 500;
int size;
int num_frag;
int max_frag_len;
// Find RMST blob to send
rmst_data_attr = RmstDataAttr.find(send_attrs);
// We must have a RMST data attribute to send
if(!rmst_data_attr){
DiffPrint(DEBUG_ALWAYS, "sendRMST - can't find blob to send !\n");
return FAIL;
}
// Copy RMST blob and calculate number of fragments
blob_ptr = rmst_data_attr->getVal();
size = rmst_data_attr->getLen();
blob = new char[size];
memcpy((void *)blob, blob_ptr, size);
num_frag = (size + fragment_size - 1) / fragment_size;
// We index starting at zero
num_frag--;
max_frag_len = size - (num_frag * fragment_size);
DiffPrint(DEBUG_DETAILS,
"sendRMST: rmst num_frag = %d, fragment_size = %d, max_frag_len = %d\n",
num_frag, fragment_size, max_frag_len);
// Prepare attribute vector with RMST attributes
max_frag_attr = RmstMaxFragAttr.make(NRAttribute::IS, num_frag);
send_attrs->push_back(max_frag_attr);
send_attrs->push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);
send_attrs->push_back(frag_number_attr);
send_attrs->push_back(RmstIdAttr.make(NRAttribute::IS, id));
// Replace the large blob with a blob fragment
frag_ptr = (void *)&blob[0];
// The call to setVal will delete the original blob!!
if (num_frag == 0)
rmst_data_attr->setVal(frag_ptr, max_frag_len);
else
rmst_data_attr->setVal(frag_ptr, fragment_size);
// Send 1st fragment
retval = send(publication_handle, send_attrs);
// Send other fragments
for (int i = 1; i <= num_frag; i++){
// Small delay between sending fragments
send_interval.tv_sec = 0;
send_interval.tv_usec = 25000;
select(0, NULL, NULL, NULL, &send_interval);
// Send next fragment
frag_number_attr->setVal(i);
frag_ptr = (void *)&blob[i * fragment_size];
if (num_frag == i)
rmst_data_attr->setVal(frag_ptr, max_frag_len);
else
rmst_data_attr->setVal(frag_ptr, fragment_size);
retval = send(publication_handle, send_attrs);
}
ClearAttrs(send_attrs);
delete blob;
return OK;
}
int DiffusionRouting::addToBlacklist(int32_t node)
{
ControlMessage *control_blob;
NRAttribute *ctrl_msg_attr;
Message *my_message;
NRAttrVec *attrs;
control_blob = new ControlMessage(ADD_TO_BLACKLIST, node, 0);
ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
(void *)control_blob,
sizeof(ControlMessage));
attrs = new NRAttrVec;
attrs->push_back(ctrl_msg_attr);
my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
0, pkt_count_, random_id_, LOCALHOST_ADDR,
LOCALHOST_ADDR);
// Increment pkt_counter
pkt_count_++;
// Add attributes to the message
my_message->msg_attr_vec_ = attrs;
my_message->num_attr_ = attrs->size();
my_message->data_len_ = CalculateSize(attrs);
// Send Packet
sendMessageToDiffusion(my_message);
// Delete message
delete my_message;
delete control_blob;
return OK;
}
int DiffusionRouting::clearBlacklist()
{
ControlMessage *control_blob;
NRAttribute *ctrl_msg_attr;
Message *my_message;
NRAttrVec *attrs;
control_blob = new ControlMessage(CLEAR_BLACKLIST, 0, 0);
ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
(void *)control_blob,
sizeof(ControlMessage));
attrs = new NRAttrVec;
attrs->push_back(ctrl_msg_attr);
my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
0, pkt_count_, random_id_, LOCALHOST_ADDR,
LOCALHOST_ADDR);
// Increment pkt_counter
pkt_count_++;
// Add attributes to the message
my_message->msg_attr_vec_ = attrs;
my_message->num_attr_ = attrs->size();
my_message->data_len_ = CalculateSize(attrs);
// Send Packet
sendMessageToDiffusion(my_message);
// Delete message
delete my_message;
delete control_blob;
return OK;
}
handle DiffusionRouting::addFilter(NRAttrVec *filter_attrs, u_int16_t priority,
FilterCallback *cb)
{
FilterEntry *filter_entry;
NRAttrVec *attrs;
NRAttribute *ctrl_msg_attr;
ControlMessage *control_blob;
Message *my_message;
TimerCallback *timer_callback;
// Check parameters
if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){
DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !\n");
return FAIL;
}
// Get lock first
GetLock(dr_mtx_);
// Create and Initialize the handle_entry structute
filter_entry = new FilterEntry(next_handle_, priority, agent_id_);
next_handle_++;
filter_entry->cb_ = (FilterCallback *) cb;
filter_list_.push_back(filter_entry);
// Copy attributes (keep them for matching later)
filter_entry->filter_attrs_ = CopyAttrs(filter_attrs);
// Copy the attributes (and add the control attr)
attrs = CopyAttrs(filter_attrs);
control_blob = new ControlMessage(ADD_UPDATE_FILTER,
priority, filter_entry->handle_);
ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
(void *)control_blob,
sizeof(ControlMessage));
attrs->push_back(ctrl_msg_attr);
// Initialize message structure
my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
0, pkt_count_, random_id_, LOCALHOST_ADDR,
LOCALHOST_ADDR);
// Increment pkt_counter
pkt_count_++;
// Add attributes to the message
my_message->msg_attr_vec_ = attrs;
my_message->num_attr_ = attrs->size();
my_message->data_len_ = CalculateSize(attrs);
// Release the lock
ReleaseLock(dr_mtx_);
// Send Packet
sendMessageToDiffusion(my_message);
// Add keepalive timer to the event queue
timer_callback = new FilterKeepaliveCallback(this, filter_entry);
timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback);
// Delete message, attribute set and controlblob
delete my_message;
delete control_blob;
return filter_entry->handle_;
}
int DiffusionRouting::removeFilter(handle filter_handle)
{
FilterEntry *filter_entry = NULL;
ControlMessage *control_blob;
NRAttribute *ctrl_msg_attr;
NRAttrVec *attrs;
Message *my_message;
// Get lock first
GetLock(dr_mtx_);
filter_entry = findFilter(filter_handle);
if (!filter_entry){
// Handle doesn't exist, return FAIL
ReleaseLock(dr_mtx_);
return FAIL;
}
control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0);
ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
(void *)control_blob,
sizeof(ControlMessage));
attrs = new NRAttrVec;
attrs->push_back(ctrl_msg_attr);
my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
0, pkt_count_, random_id_, LOCALHOST_ADDR,
LOCALHOST_ADDR);
// Increment pkt_counter
pkt_count_++;
// Add attributes to the message
my_message->msg_attr_vec_ = attrs;
my_message->num_attr_ = attrs->size();
my_message->data_len_ = CalculateSize(attrs);
// Handle will be destroyed when next keepalive timer happens
filter_entry->valid_ = false;
// Send Packet
sendMessageToDiffusion(my_message);
// Release the lock
ReleaseLock(dr_mtx_);
// Delete message
delete my_message;
delete control_blob;
return OK;
}
handle DiffusionRouting::addTimer(int timeout, TimerCallback *callback)
{
return (timers_manager_->addTimer(timeout, callback));
}
handle DiffusionRouting::addTimer(int timeout, void *p, TimerCallbacks *cb)
{
TimerCallback *callback;
callback = new OldAPITimer(cb, p);
return (addTimer(timeout, callback));
}
bool DiffusionRouting::removeTimer(handle hdl)
{
return (timers_manager_->removeTimer(hdl));
}
int DiffusionRouting::filterKeepaliveTimeout(FilterEntry *filter_entry)
{
FilterEntry *my_entry = NULL;
ControlMessage *control_blob;
NRAttribute *ctrl_msg_attr;
NRAttrVec *attrs;
Message *my_message;
// Acquire lock first
GetLock(dr_mtx_);
if (filter_entry->valid_){
// Send keepalive
control_blob = new ControlMessage(ADD_UPDATE_FILTER,
filter_entry->priority_,
filter_entry->handle_);
ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
(void *)control_blob,
sizeof(ControlMessage));
attrs = CopyAttrs(filter_entry->filter_attrs_);
attrs->push_back(ctrl_msg_attr);
my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
0, pkt_count_, random_id_, LOCALHOST_ADDR,
LOCALHOST_ADDR);
// Increment pkt_counter
pkt_count_++;
// Add attributes to the message
my_message->msg_attr_vec_ = attrs;
my_message->num_attr_ = attrs->size();
my_message->data_len_ = CalculateSize(attrs);
// Send Message
sendMessageToDiffusion(my_message);
delete my_message;
delete control_blob;
// Release lock
ReleaseLock(dr_mtx_);
// Reschedule another filter keepalive timer in event queue
return (FILTER_KEEPALIVE_DELAY);
}
else{
// Filter was removed
my_entry = deleteFilter(filter_entry->handle_);
// We should have removed the correct handle
if (my_entry != filter_entry){
DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !\n");
exit(-1);
}
delete my_entry;
// Release lock
ReleaseLock(dr_mtx_);
return -1;
}
}
int DiffusionRouting::interestTimeout(HandleEntry *handle_entry)
{
HandleEntry *my_handle = NULL;
Message *my_message;
// Acquire lock first
GetLock(dr_mtx_);
if (handle_entry->valid_){
// Send the interest message if entry is still valid
my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0,
0, pkt_count_, random_id_, LOCALHOST_ADDR,
LOCALHOST_ADDR);
// Increment pkt_counter
pkt_count_++;
// Add attributes to the message
my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_);
my_message->num_attr_ = handle_entry->attrs_->size();
my_message->data_len_ = CalculateSize(handle_entry->attrs_);
// Send Packet
sendMessageToDiffusion(my_message);
delete my_message;
// Release lock
ReleaseLock(dr_mtx_);
// Reschedule this timer in the queue
return (int) (floor(-1 * (log(1 - (GetRand() * 1.0 / RAND_MAX))) /
INTEREST_LAMBDA));
}
else{
// Interest was canceled. Just delete it from the handle_list
my_handle = removeHandle(handle_entry->hdl_, &sub_list_);
// We should have removed the correct handle
if (my_handle != handle_entry){
DiffPrint(DEBUG_ALWAYS,
"Error: interestTimeout: Handles should match !\n");
exit(-1);
}
delete my_handle;
// Release lock
ReleaseLock(dr_mtx_);
// Delete timer from the queue
return -1;
}
}
int DiffusionRouting::sendMessage(Message *msg, handle h,
u_int16_t priority)
{
RedirectMessage *original_hdr;
NRAttribute *original_attr, *ctrl_msg_attr;
ControlMessage *control_blob;
NRAttrVec *attrs;
Message *my_message;
if ((priority < FILTER_MIN_PRIORITY) ||
(priority > FILTER_KEEP_PRIORITY))
return FAIL;
// Create an attribute with the original header
original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
msg->source_port_, msg->data_len_,
msg->num_attr_, msg->rdm_id_,
msg->pkt_num_, msg->next_hop_,
msg->last_hop_, 0,
msg->next_port_);
original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr,
sizeof(RedirectMessage));
// Create the attribute with the control message
control_blob = new ControlMessage(SEND_MESSAGE, h, priority);
ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob,
sizeof(ControlMessage));
// Copy Attributes and add originalAttr and controlAttr
attrs = CopyAttrs(msg->msg_attr_vec_);
attrs->push_back(original_attr);
attrs->push_back(ctrl_msg_attr);
my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
0, pkt_count_, random_id_, LOCALHOST_ADDR,
LOCALHOST_ADDR);
// Increment pkt_counter
pkt_count_++;
// Add attributes to the message
my_message->msg_attr_vec_ = attrs;
my_message->num_attr_ = attrs->size();
my_message->data_len_ = CalculateSize(attrs);
// Send Packet
sendMessageToDiffusion(my_message);
delete my_message;
delete control_blob;
delete original_hdr;
return OK;
}
#ifndef NS_DIFFUSION
void DiffusionRouting::doIt()
{
run(true, WAIT_FOREVER);
}
void DiffusionRouting::doOne(long timeout)
{
run(false, timeout);
}
void DiffusionRouting::run(bool wait_condition, long max_timeout)
{
DeviceList::iterator itr;
int status, max_sock, fd;
bool flag;
DiffPacket in_pkt;
fd_set fds;
struct timeval tv;
struct timeval max_tv;
do{
FD_ZERO(&fds);
max_sock = 0;
// Set the maximum timeout value
max_tv.tv_sec = (int) (max_timeout / 1000);
max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000);
for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
(*itr)->addInFDS(&fds, &max_sock);
}
// Check for the next timer
timers_manager_->nextTimerTime(&tv);
if (tv.tv_sec == MAXVALUE){
// If we don't have any timers, we wait for POLLING_INTERVAL
if (max_timeout == WAIT_FOREVER){
tv.tv_sec = POLLING_INTERVAL;
tv.tv_usec = 0;
}
else{
tv = max_tv;
}
}
else{
if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){
// max_timeout value is smaller than next timer's time, so we
// use themax_timeout value instead
tv = max_tv;
}
}
status = select(max_sock+1, &fds, NULL, NULL, &tv);
if (status == 0){
// Process all timers that have expired
timers_manager_->executeAllExpiredTimers();
}
if (status > 0){
do{
flag = false;
for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
fd = (*itr)->checkInFDS(&fds);
if (fd != -1){
// Message waiting
in_pkt = (*itr)->recvPacket(fd);
recvPacket(in_pkt);
// Clear this fd
FD_CLR(fd, &fds);
status--;
flag = true;
}
}
} while ((status > 0) && (flag == true));
}
else
if (status < 0){
DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status);
}
} while (wait_condition);
}
#endif // NS_DIFFUSION
#ifndef NS_DIFFUSION
void DiffusionRouting::sendMessageToDiffusion(Message *msg)
{
DiffPacket out_pkt = NULL;
struct hdr_diff *dfh;
char *pos;
int len;
out_pkt = AllocateBuffer(msg->msg_attr_vec_);
dfh = HDR_DIFF(out_pkt);
pos = (char *) out_pkt;
pos = pos + sizeof(struct hdr_diff);
len = PackAttrs(msg->msg_attr_vec_, pos);
LAST_HOP(dfh) = htonl(msg->last_hop_);
NEXT_HOP(dfh) = htonl(msg->next_hop_);
DIFF_VER(dfh) = msg->version_;
MSG_TYPE(dfh) = msg->msg_type_;
DATA_LEN(dfh) = htons(len);
PKT_NUM(dfh) = htonl(msg->pkt_num_);
RDM_ID(dfh) = htonl(msg->rdm_id_);
NUM_ATTR(dfh) = htons(msg->num_attr_);
SRC_PORT(dfh) = htons(msg->source_port_);
sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_);
delete [] out_pkt;
}
#else
void DiffusionRouting::sendMessageToDiffusion(Message *msg)
{
Message *my_msg;
DeviceList::iterator itr;
int len;
my_msg = CopyMessage(msg);
len = CalculateSize(my_msg->msg_attr_vec_);
len = len + sizeof(struct hdr_diff);
for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
(*itr)->sendPacket((DiffPacket) my_msg, len, diffusion_port_);
}
}
#endif // !NS_DIFFUSION
void DiffusionRouting::sendPacketToDiffusion(DiffPacket pkt, int len, int dst)
{
DeviceList::iterator itr;
for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
(*itr)->sendPacket(pkt, len, dst);
}
}
#ifndef NS_DIFFUSION
void DiffusionRouting::recvPacket(DiffPacket pkt)
{
struct hdr_diff *dfh = HDR_DIFF(pkt);
Message *rcv_message = NULL;
int8_t version, msg_type;
u_int16_t data_len, num_attr, source_port;
int32_t pkt_num, rdm_id, next_hop, last_hop;
// Read header
version = DIFF_VER(dfh);
msg_type = MSG_TYPE(dfh);
source_port = ntohs(SRC_PORT(dfh));
pkt_num = ntohl(PKT_NUM(dfh));
rdm_id = ntohl(RDM_ID(dfh));
num_attr = ntohs(NUM_ATTR(dfh));
next_hop = ntohl(NEXT_HOP(dfh));
last_hop = ntohl(LAST_HOP(dfh));
data_len = ntohs(DATA_LEN(dfh));
// Create a message structure from the incoming packet
rcv_message = new Message(version, msg_type, source_port, data_len,
num_attr, pkt_num, rdm_id, next_hop, last_hop);
// Read all attributes into the Message structure
rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
// Process the incoming message
recvMessage(rcv_message);
// We are done
delete rcv_message;
delete [] pkt;
}
#endif // !NS_DIFFUSION
void DiffusionRouting::recvMessage(Message *msg)
{
// Check version
if (msg->version_ != DIFFUSION_VERSION)
return;
// Check destination
if (msg->next_hop_ != LOCALHOST_ADDR)
return;
// Process the incoming message
if (msg->msg_type_ == REDIRECT)
processControlMessage(msg);
else
processMessage(msg);
}
void DiffusionRouting::processControlMessage(Message *msg)
{
NRSimpleAttribute<void *> *original_header_attr = NULL;
NRAttrVec::iterator place = msg->msg_attr_vec_->begin();
RedirectMessage *original_header;
FilterEntry *entry;
handle my_handle;
// Find the attribute containing the original packet header
original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
place, &place);
if (!original_header_attr){
DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !\n");
return;
}
// Restore original message header
original_header = (RedirectMessage *) original_header_attr->getVal();
my_handle = original_header->handle_;
msg->msg_type_ = original_header->msg_type_;
msg->source_port_ = original_header->source_port_;
msg->pkt_num_ = original_header->pkt_num_;
msg->rdm_id_ = original_header->rdm_id_;
msg->next_hop_ = original_header->next_hop_;
msg->last_hop_ = original_header->last_hop_;
msg->num_attr_ = original_header->num_attr_;
msg->new_message_ = original_header->new_message_;
msg->next_port_ = original_header->next_port_;
// Delete attribute from the original set
msg->msg_attr_vec_->erase(place);
delete original_header_attr;
// Find the right callback
GetLock(dr_mtx_);
entry = findFilter(my_handle);
if (entry && entry->valid_){
// Just to confirm
if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){
ReleaseLock(dr_mtx_);
entry->cb_->recv(msg, my_handle);
return;
}
else{
DiffPrint(DEBUG_ALWAYS,
"Warning: Filter doesn't match incoming message's attributes !\n");
}
}
else{
DiffPrint(DEBUG_IMPORTANT,
"Report: Cannot find filter (possibly deleted ?)\n");
}
ReleaseLock(dr_mtx_);
}
void DiffusionRouting::processMessage(Message *msg)
{
NRSimpleAttribute<int> *rmst_id_attr = NULL;
CallbackList::iterator cbl_itr;
HandleList::iterator sub_itr;
NRAttrVec *callback_attrs;
HandleEntry *entry;
CallbackEntry *aux;
CallbackList cbl;
// First, acquire the lock
GetLock(dr_mtx_);
for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){
entry = *sub_itr;
if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_)))
if (entry->cb_){
aux = new CallbackEntry(entry->cb_, entry->hdl_);
cbl.push_back(aux);
}
}
// We can release the lock now
ReleaseLock(dr_mtx_);
// Check for RMST id attribute
rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
cbl_itr = cbl.begin();
// Process RMST fragment if we have callbacks and this message has an RmstId
if (rmst_id_attr && (cbl_itr != cbl.end())){
if (!processRmst(msg)){
cbl.clear();
return;
}
}
// Now we just call all callback functions
for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){
// Copy attributes
callback_attrs = CopyAttrs(msg->msg_attr_vec_);
// Call app-specific callback function
aux = *cbl_itr;
aux->cb_->recv(callback_attrs, aux->subscription_handle_);
delete aux;
// Clean up callback attributes
ClearAttrs(callback_attrs);
delete callback_attrs;
}
// We are done
cbl.clear();
}
bool DiffusionRouting::processRmst(Message *msg)
{
NRSimpleAttribute<void *> *data_buf_attr = NULL;
NRSimpleAttribute<int> *max_frag_attr = NULL;
NRSimpleAttribute<int> *rmst_id_attr = NULL;
NRSimpleAttribute<int> *frag_attr = NULL;
int rmst_no, frag_no, data_buf_len, count;
void *blob_ptr, *tmp_frag_ptr;
Int2RecRmst::iterator rmst_iterator;
Int2Frag::iterator frag_iterator;
char *dstPtr;
int dstSize;
RecRmst *rmst_ptr;
// Read Rmst attributes
data_buf_attr = RmstDataAttr.find(msg->msg_attr_vec_);
rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
rmst_no = rmst_id_attr->getVal();
frag_no = frag_attr->getVal();
blob_ptr = data_buf_attr->getVal();
data_buf_len = data_buf_attr->getLen();
// See if we are receiving this blob, if not start a new RecRmst
rmst_iterator = rec_rmst_map_.find(rmst_no);
if (rmst_iterator == rec_rmst_map_.end()){
rmst_ptr = new RecRmst(rmst_no);
rec_rmst_map_.insert(Int2RecRmst::value_type(rmst_no, rmst_ptr));
}
else
rmst_ptr = (*rmst_iterator).second;
if (frag_no == 0){
max_frag_attr = RmstMaxFragAttr.find(msg->msg_attr_vec_);
rmst_ptr->max_frag_ = max_frag_attr->getVal();
rmst_ptr->mtu_len_ = data_buf_len;
}
// Copy fragment to map
tmp_frag_ptr = new char[data_buf_len];
memcpy(tmp_frag_ptr, blob_ptr, data_buf_len);
rmst_ptr->frag_map_.insert(Int2Frag::value_type(frag_no, tmp_frag_ptr));
if (frag_no == rmst_ptr->max_frag_)
rmst_ptr->max_frag_len_ = data_buf_len;
count = rmst_ptr->frag_map_.size();
// If this is the last rmst fragment, create the entire rmst
if (count == (rmst_ptr->max_frag_ + 1)){
DiffPrint(DEBUG_DETAILS,
"RMST #%d is complete, creating big blob !\n", rmst_no);
// Allocate memory for the big blob
dstSize = rmst_ptr->max_frag_ * rmst_ptr->mtu_len_ + rmst_ptr->max_frag_len_;
dstPtr = new char[dstSize];
// Copy all but last fragment to a buffer
for (int i = 0; i < rmst_ptr->max_frag_; i++){
frag_iterator = rmst_ptr->frag_map_.find(i);
tmp_frag_ptr = (*frag_iterator).second;
memcpy((void *)&dstPtr[i * rmst_ptr->mtu_len_],
(void *)tmp_frag_ptr, rmst_ptr->mtu_len_);
}
// Now, copy the last fragment to the buffer
frag_iterator = rmst_ptr->frag_map_.find(rmst_ptr->max_frag_);
tmp_frag_ptr = (*frag_iterator).second;
memcpy((void *)&dstPtr[rmst_ptr->max_frag_ * rmst_ptr->mtu_len_],
(void *)tmp_frag_ptr, rmst_ptr->max_frag_len_);
// Since we copied everything from the map - clean it up
rec_rmst_map_.erase(rmst_iterator);
delete rmst_ptr;
// Now we substitute the last fragment with the reconstructed blob
data_buf_attr->setVal(dstPtr, dstSize);
// Deliver this to the application
return true;
}
// We don't have the entire blob
return false;
}
HandleEntry * DiffusionRouting::removeHandle(handle my_handle, HandleList *hl)
{
HandleList::iterator itr;
HandleEntry *entry;
for (itr = hl->begin(); itr != hl->end(); ++itr){
entry = *itr;
if (entry->hdl_ == my_handle){
hl->erase(itr);
return entry;
}
}
return NULL;
}
HandleEntry * DiffusionRouting::findHandle(handle my_handle, HandleList *hl)
{
HandleList::iterator itr;
HandleEntry *entry;
for (itr = hl->begin(); itr != hl->end(); ++itr){
entry = *itr;
if (entry->hdl_ == my_handle)
return entry;
}
return NULL;
}
FilterEntry * DiffusionRouting::deleteFilter(handle my_handle)
{
FilterList::iterator itr;
FilterEntry *entry;
for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
entry = *itr;
if (entry->handle_ == my_handle){
filter_list_.erase(itr);
return entry;
}
}
return NULL;
}
FilterEntry * DiffusionRouting::findFilter(handle my_handle)
{
FilterList::iterator itr;
FilterEntry *entry;
for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
entry = *itr;
if (entry->handle_ == my_handle)
return entry;
}
return NULL;
}
bool DiffusionRouting::hasScope(NRAttrVec *attrs)
{
NRAttribute *temp = NULL;
temp = NRScopeAttr.find(attrs);
if (temp)
return true;
return false;
}
bool DiffusionRouting::checkSubscription(NRAttrVec *attrs)
{
NRSimpleAttribute<int> *nrclass = NULL;
NRSimpleAttribute<int> *nrscope = NULL;
// We first try to locate both class and scope attributes
nrclass = NRClassAttr.find(attrs);
nrscope = NRScopeAttr.find(attrs);
// There must be a class attribute in subscriptions
if (!nrclass)
return false;
if (nrscope){
// This subcription has both class and scope attribute. So, we
// check if class/scope attributes comply with the Diffusion
// Routing API
// Must check scope's operator. The API requires it to be "IS"
if (nrscope->getOp() != NRAttribute::IS)
return false;
// Ok, so first check if this is a global subscription
if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) &&
(nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
(nrclass->getOp() == NRAttribute::IS))
return true;
// Check for local subscriptions
if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
return true;
// Just to be sure we did not miss any case
return false;
}
// If there is no scope attribute, we will insert one later if this
// subscription looks like a global subscription
if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
(nrclass->getOp() == NRAttribute::IS))
return true;
return false;
}
bool DiffusionRouting::checkPublication(NRAttrVec *attrs)
{
NRSimpleAttribute<int> *nrclass = NULL;
NRSimpleAttribute<int> *nrscope = NULL;
// We first try to locate both class and scope attributes
nrclass = NRClassAttr.find(attrs);
nrscope = NRScopeAttr.find(attrs);
// There must be a class attribute in the publication
if (!nrclass)
return false;
// In addition, the Diffusion Routing API requires the class
// attribute to be set to "IS DATA_CLASS"
if ((nrclass->getVal() != NRAttribute::DATA_CLASS) ||
(nrclass->getOp() != NRAttribute::IS))
return false;
if (nrscope){
// Ok, so this publication has both class and scope attributes. We
// now have to check if they comply to the Diffusion Routing API
// semantics for publish
// Must check scope's operator. The API requires it to be "IS"
if (nrscope->getOp() != NRAttribute::IS)
return false;
// We accept both global and local scope data messages
if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) ||
(nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE))
return true;
// Just not to miss any case
return false;
}
// A publish without a scope attribute is fine, we will include a
// default NODE_LOCAL_SCOPE attribute later
return true;
}
bool DiffusionRouting::checkSend(NRAttrVec *attrs)
{
NRSimpleAttribute<int> *nrclass = NULL;
NRSimpleAttribute<int> *nrscope = NULL;
// Currently only checks for Class and Scope attributes
nrclass = NRClassAttr.find(attrs);
nrscope = NRScopeAttr.find(attrs);
if (nrclass || nrscope)
return false;
return true;
}
bool DiffusionRouting::isPushData(NRAttrVec *attrs)
{
NRSimpleAttribute<int> *nrclass = NULL;
NRSimpleAttribute<int> *nrscope = NULL;
// Currently only checks for Class and Scope attributes
nrclass = NRClassAttr.find(attrs);
nrscope = NRScopeAttr.find(attrs);
// We should have both class and scope
if (nrclass && nrscope){
if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
return false;
return true;
}
else{
DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !\n");
return false;
}
}
syntax highlighted by Code2HTML, v. 0.9.1