// // dr.cc : Diffusion Routing Class // authors : John Heidemann and Fabio Silva // // Copyright (C) 2000-2003 by the University of Southern California // $Id: dr.cc,v 1.17 2005/09/13 04:53:49 tomh Exp $ // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License, // version 2, as published by the Free Software Foundation. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License along // with this program; if not, write to the Free Software Foundation, Inc., // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA. // // Linking this file statically or dynamically with other modules is making // a combined work based on this file. Thus, the terms and conditions of // the GNU General Public License cover the whole combination. // // In addition, as a special exception, the copyright holders of this file // give you permission to combine this file with free software programs or // libraries that are released under the GNU LGPL and with code included in // the standard release of ns-2 under the Apache 2.0 license or under // otherwise-compatible licenses with advertising requirements (or modified // versions of such code, with unchanged license). You may copy and // distribute such a system following the terms of the GNU GPL for this // file and the licenses of the other code concerned, provided that you // include the source code of that other code when and as the GNU GPL // requires distribution of source code. // // Note that people who make modified versions of this file are not // obligated to grant this special exception for their modified versions; // it is their choice whether to do so. The GNU General Public License // gives permission to release a modified version without this exception; // this exception also makes it possible to release a modified version // which carries forward this exception. // #include #include #include "dr.hh" class CallbackEntry { public: NR::Callback *cb_; NR::handle subscription_handle_; CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) : cb_(cb), subscription_handle_(subscription_handle) {}; }; class HandleEntry { public: handle hdl_; bool valid_; NRAttrVec *attrs_; NR::Callback *cb_; struct timeval exploratory_time_; int32_t subscription_id_; // Used for One-Phase Pull HandleEntry() { GetTime(&exploratory_time_); valid_ = true; cb_ = NULL; }; ~HandleEntry(){ ClearAttrs(attrs_); delete attrs_; }; }; int InterestCallback::expire() { int retval; // Call the interestTimeout function retval = drt_->interestTimeout(handle_entry_); if (retval < 0) delete this; return retval; } int FilterKeepaliveCallback::expire() { int retval; // Call the filterTimeout function retval = drt_->filterKeepaliveTimeout(filter_entry_); if (retval < 0) delete this; return retval; } int OldAPITimer::expire() { int retval; // Call the callback function with the provided API retval = cb_->expire(0, p_); if (retval < 0) delete this; return retval; } #ifdef NS_DIFFUSION class DiffEventQueue; int DiffusionRouting::getNodeId() { return node_->address(); } int DiffusionRouting::getAgentId(int id) { if (id != -1) agent_id_ = id; return agent_id_; } NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) { return(new DiffusionRouting(port, da)); } #else NR *dr = NULL; #ifdef USE_THREADS void * ReceiveThread(void *dr) { // Never returns ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER); return NULL; } #endif // USE_THREADS NR * NR::createNR(u_int16_t port) { // Create Diffusion Routing Class if (dr) return dr; dr = new DiffusionRouting(port); #ifdef USE_THREADS int retval; pthread_t thread; // Fork a thread for receiving Messages retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr); if (retval){ DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...\n"); exit(-1); } #endif // USE_THREADS return dr; } #endif // NS_DIFFUSION void GetLock(pthread_mutex_t *mutex) { #ifdef USE_THREADS pthread_mutex_lock(mutex); #endif // USE_THREADS } void ReleaseLock(pthread_mutex_t *mutex) { #ifdef USE_THREADS pthread_mutex_unlock(mutex); #endif // USE_THREADS } #ifdef NS_DIFFUSION DiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da) { #else DiffusionRouting::DiffusionRouting(u_int16_t port) { #ifdef USE_EMSIM char *sim_id; char *sim_group; #endif // USE_EMSIM #endif // NS_DIFFUSION struct timeval tv; DiffusionIO *device; // Initialize basic stuff next_handle_ = 1; GetTime(&tv); SetSeed(&tv); pkt_count_ = GetRand(); random_id_ = GetRand(); agent_id_ = 0; if (port == 0) port = DEFAULT_DIFFUSION_PORT; diffusion_port_ = port; #ifdef USE_EMSIM // Check if we are running in the emstar simulator sim_id = getenv("SIM_ID"); sim_group = getenv("SIM_GROUP"); // Update diffusion port if running inside the simulator if (sim_id && sim_group){ diffusion_port_ = diffusion_port_ + atoi(sim_id) + (100 * atoi(sim_group)); } #endif // USE_EMSIM // Initialize timer manager timers_manager_ = new TimerManager; // Initialize input device #ifdef NS_DIFFUSION device = new NsLocal(da); local_out_devices_.push_back(device); #endif // NS_DIFFUSION #ifdef UDP device = new UDPLocal(&agent_id_); in_devices_.push_back(device); local_out_devices_.push_back(device); #endif // UDP // Print initialization message DiffPrint(DEBUG_ALWAYS, "Diffusion Routing Agent initializing... Agent Id = %d\n", agent_id_); #ifdef USE_THREADS // Initialize Semaphores dr_mtx_ = new pthread_mutex_t; pthread_mutex_init(dr_mtx_, NULL); #endif // USE_THREADS } DiffusionRouting::~DiffusionRouting() { HandleList::iterator itr; HandleEntry *current; // Delete all Handles for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){ current = *itr; delete current; } for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){ current = *itr; delete current; } } handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb) { NRSimpleAttribute *nr_algorithm = NULL; TimerCallback *timer_callback; NRAttribute *scope_attr; HandleEntry *my_handle; // Get lock first GetLock(dr_mtx_); // Check the published attributes if (!checkSubscription(subscribe_attrs)){ DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !\n"); ReleaseLock(dr_mtx_); return FAIL; } // Create and Initialize the handle_entry structute my_handle = new HandleEntry; my_handle->hdl_ = next_handle_; next_handle_++; my_handle->cb_ = (NR::Callback *) cb; sub_list_.push_back(my_handle); // Copy the attributes my_handle->attrs_ = CopyAttrs(subscribe_attrs); // For subscriptions, scope is global if not specified if (!hasScope(subscribe_attrs)){ scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE); my_handle->attrs_->push_back(scope_attr); } // For One-Phase Pull, we need a subscription id nr_algorithm = NRAlgorithmAttr.find(subscribe_attrs); if (nr_algorithm && nr_algorithm->getVal() == NRAttribute::ONE_PHASE_PULL_ALGORITHM){ my_handle->subscription_id_ = GetRand(); my_handle->attrs_->push_back(NRSubscriptionAttr.make(NRAttribute::IS, my_handle->subscription_id_)); } // Create Interest Timer and add it to the queue timer_callback = new InterestCallback(this, my_handle); timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback); // Release lock ReleaseLock(dr_mtx_); return my_handle->hdl_; } int DiffusionRouting::unsubscribe(handle subscription_handle) { HandleEntry *my_handle = NULL; // Get the lock first GetLock(dr_mtx_); my_handle = findHandle(subscription_handle, &sub_list_); if (!my_handle){ // Handle doesn't exist, return FAIL ReleaseLock(dr_mtx_); return FAIL; } // Handle will be destroyed when next interest timeout happens my_handle->valid_ = false; // Release the lock ReleaseLock(dr_mtx_); return OK; } handle DiffusionRouting::publish(NRAttrVec *publish_attrs) { HandleEntry *my_handle; NRAttribute *scope_attr; // Get the lock first GetLock(dr_mtx_); // Check the published attributes if (!checkPublication(publish_attrs)){ DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !\n"); ReleaseLock(dr_mtx_); return FAIL; } // Create and Initialize the handle_entry structute my_handle = new HandleEntry; my_handle->hdl_ = next_handle_; next_handle_++; pub_list_.push_back(my_handle); // Copy the attributes my_handle->attrs_ = CopyAttrs(publish_attrs); // For publications, scope is local if not specified if (!hasScope(publish_attrs)){ scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE); my_handle->attrs_->push_back(scope_attr); } // Release the lock ReleaseLock(dr_mtx_); return my_handle->hdl_; } int DiffusionRouting::unpublish(handle publication_handle) { HandleEntry *my_handle = NULL; // Get the lock first GetLock(dr_mtx_); my_handle = removeHandle(publication_handle, &pub_list_); if (!my_handle){ // Handle doesn't exist, return FAIL ReleaseLock(dr_mtx_); return FAIL; } // Free structures delete my_handle; // Release the lock ReleaseLock(dr_mtx_); return OK; } int DiffusionRouting::send(handle publication_handle, NRAttrVec *send_attrs) { NRSimpleAttribute *nr_algorithm = NULL; NRSimpleAttribute *rmst_id_attr = NULL; int8_t send_message_type = DATA; struct timeval current_time; HandleEntry *my_handle; Message *my_message; // Get the lock first GetLock(dr_mtx_); // Get attributes associated with handle my_handle = findHandle(publication_handle, &pub_list_); if (!my_handle){ ReleaseLock(dr_mtx_); return FAIL; } // Check the send attributes if (!checkSend(send_attrs)){ DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in send attributes !\n"); ReleaseLock(dr_mtx_); return FAIL; } // Check if it is time to send another exploratory data message GetTime(¤t_time); // Check algorithms nr_algorithm = NRAlgorithmAttr.find(my_handle->attrs_); rmst_id_attr = RmstIdAttr.find(send_attrs); if (!nr_algorithm && !rmst_id_attr || nr_algorithm && nr_algorithm->getVal() != NRAttribute::ONE_PHASE_PULL_ALGORITHM){ // In One-Phase Pull, there are no exploratory messages if (TimevalCmp(¤t_time, &(my_handle->exploratory_time_)) >= 0){ // Check if it is a push data message or a regular data message if (isPushData(my_handle->attrs_)){ // Push data message // Update time for the next push exploratory message GetTime(&(my_handle->exploratory_time_)); my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY; send_message_type = PUSH_EXPLORATORY_DATA; } else{ // Regular data message // Update time for the next exploratory message GetTime(&(my_handle->exploratory_time_)); my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY; send_message_type = EXPLORATORY_DATA; } } } // Initialize message structure my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // First, we duplicate the 'publish' attributes my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_); // Now, we add the send attributes AddAttrs(my_message->msg_attr_vec_, send_attrs); // Compute the total number and size of the joined attribute sets my_message->num_attr_ = my_message->msg_attr_vec_->size(); my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_); // Release the lock ReleaseLock(dr_mtx_); // Send Packet sendMessageToDiffusion(my_message); delete my_message; return OK; } int DiffusionRouting::sendRmst(handle publication_handle, NRAttrVec *send_attrs, int fragment_size) { NRSimpleAttribute *rmst_data_attr; NRSimpleAttribute *frag_number_attr; NRSimpleAttribute *max_frag_attr; void *frag_ptr, *blob_ptr; char *blob; timeval send_interval; int retval; int id = GetRand() % 500; int size; int num_frag; int max_frag_len; // Find RMST blob to send rmst_data_attr = RmstDataAttr.find(send_attrs); // We must have a RMST data attribute to send if(!rmst_data_attr){ DiffPrint(DEBUG_ALWAYS, "sendRMST - can't find blob to send !\n"); return FAIL; } // Copy RMST blob and calculate number of fragments blob_ptr = rmst_data_attr->getVal(); size = rmst_data_attr->getLen(); blob = new char[size]; memcpy((void *)blob, blob_ptr, size); num_frag = (size + fragment_size - 1) / fragment_size; // We index starting at zero num_frag--; max_frag_len = size - (num_frag * fragment_size); DiffPrint(DEBUG_DETAILS, "sendRMST: rmst num_frag = %d, fragment_size = %d, max_frag_len = %d\n", num_frag, fragment_size, max_frag_len); // Prepare attribute vector with RMST attributes max_frag_attr = RmstMaxFragAttr.make(NRAttribute::IS, num_frag); send_attrs->push_back(max_frag_attr); send_attrs->push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP)); frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0); send_attrs->push_back(frag_number_attr); send_attrs->push_back(RmstIdAttr.make(NRAttribute::IS, id)); // Replace the large blob with a blob fragment frag_ptr = (void *)&blob[0]; // The call to setVal will delete the original blob!! if (num_frag == 0) rmst_data_attr->setVal(frag_ptr, max_frag_len); else rmst_data_attr->setVal(frag_ptr, fragment_size); // Send 1st fragment retval = send(publication_handle, send_attrs); // Send other fragments for (int i = 1; i <= num_frag; i++){ // Small delay between sending fragments send_interval.tv_sec = 0; send_interval.tv_usec = 25000; select(0, NULL, NULL, NULL, &send_interval); // Send next fragment frag_number_attr->setVal(i); frag_ptr = (void *)&blob[i * fragment_size]; if (num_frag == i) rmst_data_attr->setVal(frag_ptr, max_frag_len); else rmst_data_attr->setVal(frag_ptr, fragment_size); retval = send(publication_handle, send_attrs); } ClearAttrs(send_attrs); delete blob; return OK; } int DiffusionRouting::addToBlacklist(int32_t node) { ControlMessage *control_blob; NRAttribute *ctrl_msg_attr; Message *my_message; NRAttrVec *attrs; control_blob = new ControlMessage(ADD_TO_BLACKLIST, node, 0); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs = new NRAttrVec; attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Send Packet sendMessageToDiffusion(my_message); // Delete message delete my_message; delete control_blob; return OK; } int DiffusionRouting::clearBlacklist() { ControlMessage *control_blob; NRAttribute *ctrl_msg_attr; Message *my_message; NRAttrVec *attrs; control_blob = new ControlMessage(CLEAR_BLACKLIST, 0, 0); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs = new NRAttrVec; attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Send Packet sendMessageToDiffusion(my_message); // Delete message delete my_message; delete control_blob; return OK; } handle DiffusionRouting::addFilter(NRAttrVec *filter_attrs, u_int16_t priority, FilterCallback *cb) { FilterEntry *filter_entry; NRAttrVec *attrs; NRAttribute *ctrl_msg_attr; ControlMessage *control_blob; Message *my_message; TimerCallback *timer_callback; // Check parameters if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){ DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !\n"); return FAIL; } // Get lock first GetLock(dr_mtx_); // Create and Initialize the handle_entry structute filter_entry = new FilterEntry(next_handle_, priority, agent_id_); next_handle_++; filter_entry->cb_ = (FilterCallback *) cb; filter_list_.push_back(filter_entry); // Copy attributes (keep them for matching later) filter_entry->filter_attrs_ = CopyAttrs(filter_attrs); // Copy the attributes (and add the control attr) attrs = CopyAttrs(filter_attrs); control_blob = new ControlMessage(ADD_UPDATE_FILTER, priority, filter_entry->handle_); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs->push_back(ctrl_msg_attr); // Initialize message structure my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Release the lock ReleaseLock(dr_mtx_); // Send Packet sendMessageToDiffusion(my_message); // Add keepalive timer to the event queue timer_callback = new FilterKeepaliveCallback(this, filter_entry); timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback); // Delete message, attribute set and controlblob delete my_message; delete control_blob; return filter_entry->handle_; } int DiffusionRouting::removeFilter(handle filter_handle) { FilterEntry *filter_entry = NULL; ControlMessage *control_blob; NRAttribute *ctrl_msg_attr; NRAttrVec *attrs; Message *my_message; // Get lock first GetLock(dr_mtx_); filter_entry = findFilter(filter_handle); if (!filter_entry){ // Handle doesn't exist, return FAIL ReleaseLock(dr_mtx_); return FAIL; } control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs = new NRAttrVec; attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Handle will be destroyed when next keepalive timer happens filter_entry->valid_ = false; // Send Packet sendMessageToDiffusion(my_message); // Release the lock ReleaseLock(dr_mtx_); // Delete message delete my_message; delete control_blob; return OK; } handle DiffusionRouting::addTimer(int timeout, TimerCallback *callback) { return (timers_manager_->addTimer(timeout, callback)); } handle DiffusionRouting::addTimer(int timeout, void *p, TimerCallbacks *cb) { TimerCallback *callback; callback = new OldAPITimer(cb, p); return (addTimer(timeout, callback)); } bool DiffusionRouting::removeTimer(handle hdl) { return (timers_manager_->removeTimer(hdl)); } int DiffusionRouting::filterKeepaliveTimeout(FilterEntry *filter_entry) { FilterEntry *my_entry = NULL; ControlMessage *control_blob; NRAttribute *ctrl_msg_attr; NRAttrVec *attrs; Message *my_message; // Acquire lock first GetLock(dr_mtx_); if (filter_entry->valid_){ // Send keepalive control_blob = new ControlMessage(ADD_UPDATE_FILTER, filter_entry->priority_, filter_entry->handle_); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); attrs = CopyAttrs(filter_entry->filter_attrs_); attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Send Message sendMessageToDiffusion(my_message); delete my_message; delete control_blob; // Release lock ReleaseLock(dr_mtx_); // Reschedule another filter keepalive timer in event queue return (FILTER_KEEPALIVE_DELAY); } else{ // Filter was removed my_entry = deleteFilter(filter_entry->handle_); // We should have removed the correct handle if (my_entry != filter_entry){ DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !\n"); exit(-1); } delete my_entry; // Release lock ReleaseLock(dr_mtx_); return -1; } } int DiffusionRouting::interestTimeout(HandleEntry *handle_entry) { HandleEntry *my_handle = NULL; Message *my_message; // Acquire lock first GetLock(dr_mtx_); if (handle_entry->valid_){ // Send the interest message if entry is still valid my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_); my_message->num_attr_ = handle_entry->attrs_->size(); my_message->data_len_ = CalculateSize(handle_entry->attrs_); // Send Packet sendMessageToDiffusion(my_message); delete my_message; // Release lock ReleaseLock(dr_mtx_); // Reschedule this timer in the queue return (int) (floor(-1 * (log(1 - (GetRand() * 1.0 / RAND_MAX))) / INTEREST_LAMBDA)); } else{ // Interest was canceled. Just delete it from the handle_list my_handle = removeHandle(handle_entry->hdl_, &sub_list_); // We should have removed the correct handle if (my_handle != handle_entry){ DiffPrint(DEBUG_ALWAYS, "Error: interestTimeout: Handles should match !\n"); exit(-1); } delete my_handle; // Release lock ReleaseLock(dr_mtx_); // Delete timer from the queue return -1; } } int DiffusionRouting::sendMessage(Message *msg, handle h, u_int16_t priority) { RedirectMessage *original_hdr; NRAttribute *original_attr, *ctrl_msg_attr; ControlMessage *control_blob; NRAttrVec *attrs; Message *my_message; if ((priority < FILTER_MIN_PRIORITY) || (priority > FILTER_KEEP_PRIORITY)) return FAIL; // Create an attribute with the original header original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_, msg->source_port_, msg->data_len_, msg->num_attr_, msg->rdm_id_, msg->pkt_num_, msg->next_hop_, msg->last_hop_, 0, msg->next_port_); original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr, sizeof(RedirectMessage)); // Create the attribute with the control message control_blob = new ControlMessage(SEND_MESSAGE, h, priority); ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob, sizeof(ControlMessage)); // Copy Attributes and add originalAttr and controlAttr attrs = CopyAttrs(msg->msg_attr_vec_); attrs->push_back(original_attr); attrs->push_back(ctrl_msg_attr); my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR, LOCALHOST_ADDR); // Increment pkt_counter pkt_count_++; // Add attributes to the message my_message->msg_attr_vec_ = attrs; my_message->num_attr_ = attrs->size(); my_message->data_len_ = CalculateSize(attrs); // Send Packet sendMessageToDiffusion(my_message); delete my_message; delete control_blob; delete original_hdr; return OK; } #ifndef NS_DIFFUSION void DiffusionRouting::doIt() { run(true, WAIT_FOREVER); } void DiffusionRouting::doOne(long timeout) { run(false, timeout); } void DiffusionRouting::run(bool wait_condition, long max_timeout) { DeviceList::iterator itr; int status, max_sock, fd; bool flag; DiffPacket in_pkt; fd_set fds; struct timeval tv; struct timeval max_tv; do{ FD_ZERO(&fds); max_sock = 0; // Set the maximum timeout value max_tv.tv_sec = (int) (max_timeout / 1000); max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000); for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){ (*itr)->addInFDS(&fds, &max_sock); } // Check for the next timer timers_manager_->nextTimerTime(&tv); if (tv.tv_sec == MAXVALUE){ // If we don't have any timers, we wait for POLLING_INTERVAL if (max_timeout == WAIT_FOREVER){ tv.tv_sec = POLLING_INTERVAL; tv.tv_usec = 0; } else{ tv = max_tv; } } else{ if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){ // max_timeout value is smaller than next timer's time, so we // use themax_timeout value instead tv = max_tv; } } status = select(max_sock+1, &fds, NULL, NULL, &tv); if (status == 0){ // Process all timers that have expired timers_manager_->executeAllExpiredTimers(); } if (status > 0){ do{ flag = false; for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){ fd = (*itr)->checkInFDS(&fds); if (fd != -1){ // Message waiting in_pkt = (*itr)->recvPacket(fd); recvPacket(in_pkt); // Clear this fd FD_CLR(fd, &fds); status--; flag = true; } } } while ((status > 0) && (flag == true)); } else if (status < 0){ DiffPrint(DEBUG_IMPORTANT, "Select returned %d\n", status); } } while (wait_condition); } #endif // NS_DIFFUSION #ifndef NS_DIFFUSION void DiffusionRouting::sendMessageToDiffusion(Message *msg) { DiffPacket out_pkt = NULL; struct hdr_diff *dfh; char *pos; int len; out_pkt = AllocateBuffer(msg->msg_attr_vec_); dfh = HDR_DIFF(out_pkt); pos = (char *) out_pkt; pos = pos + sizeof(struct hdr_diff); len = PackAttrs(msg->msg_attr_vec_, pos); LAST_HOP(dfh) = htonl(msg->last_hop_); NEXT_HOP(dfh) = htonl(msg->next_hop_); DIFF_VER(dfh) = msg->version_; MSG_TYPE(dfh) = msg->msg_type_; DATA_LEN(dfh) = htons(len); PKT_NUM(dfh) = htonl(msg->pkt_num_); RDM_ID(dfh) = htonl(msg->rdm_id_); NUM_ATTR(dfh) = htons(msg->num_attr_); SRC_PORT(dfh) = htons(msg->source_port_); sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_); delete [] out_pkt; } #else void DiffusionRouting::sendMessageToDiffusion(Message *msg) { Message *my_msg; DeviceList::iterator itr; int len; my_msg = CopyMessage(msg); len = CalculateSize(my_msg->msg_attr_vec_); len = len + sizeof(struct hdr_diff); for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){ (*itr)->sendPacket((DiffPacket) my_msg, len, diffusion_port_); } } #endif // !NS_DIFFUSION void DiffusionRouting::sendPacketToDiffusion(DiffPacket pkt, int len, int dst) { DeviceList::iterator itr; for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){ (*itr)->sendPacket(pkt, len, dst); } } #ifndef NS_DIFFUSION void DiffusionRouting::recvPacket(DiffPacket pkt) { struct hdr_diff *dfh = HDR_DIFF(pkt); Message *rcv_message = NULL; int8_t version, msg_type; u_int16_t data_len, num_attr, source_port; int32_t pkt_num, rdm_id, next_hop, last_hop; // Read header version = DIFF_VER(dfh); msg_type = MSG_TYPE(dfh); source_port = ntohs(SRC_PORT(dfh)); pkt_num = ntohl(PKT_NUM(dfh)); rdm_id = ntohl(RDM_ID(dfh)); num_attr = ntohs(NUM_ATTR(dfh)); next_hop = ntohl(NEXT_HOP(dfh)); last_hop = ntohl(LAST_HOP(dfh)); data_len = ntohs(DATA_LEN(dfh)); // Create a message structure from the incoming packet rcv_message = new Message(version, msg_type, source_port, data_len, num_attr, pkt_num, rdm_id, next_hop, last_hop); // Read all attributes into the Message structure rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr); // Process the incoming message recvMessage(rcv_message); // We are done delete rcv_message; delete [] pkt; } #endif // !NS_DIFFUSION void DiffusionRouting::recvMessage(Message *msg) { // Check version if (msg->version_ != DIFFUSION_VERSION) return; // Check destination if (msg->next_hop_ != LOCALHOST_ADDR) return; // Process the incoming message if (msg->msg_type_ == REDIRECT) processControlMessage(msg); else processMessage(msg); } void DiffusionRouting::processControlMessage(Message *msg) { NRSimpleAttribute *original_header_attr = NULL; NRAttrVec::iterator place = msg->msg_attr_vec_->begin(); RedirectMessage *original_header; FilterEntry *entry; handle my_handle; // Find the attribute containing the original packet header original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_, place, &place); if (!original_header_attr){ DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !\n"); return; } // Restore original message header original_header = (RedirectMessage *) original_header_attr->getVal(); my_handle = original_header->handle_; msg->msg_type_ = original_header->msg_type_; msg->source_port_ = original_header->source_port_; msg->pkt_num_ = original_header->pkt_num_; msg->rdm_id_ = original_header->rdm_id_; msg->next_hop_ = original_header->next_hop_; msg->last_hop_ = original_header->last_hop_; msg->num_attr_ = original_header->num_attr_; msg->new_message_ = original_header->new_message_; msg->next_port_ = original_header->next_port_; // Delete attribute from the original set msg->msg_attr_vec_->erase(place); delete original_header_attr; // Find the right callback GetLock(dr_mtx_); entry = findFilter(my_handle); if (entry && entry->valid_){ // Just to confirm if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){ ReleaseLock(dr_mtx_); entry->cb_->recv(msg, my_handle); return; } else{ DiffPrint(DEBUG_ALWAYS, "Warning: Filter doesn't match incoming message's attributes !\n"); } } else{ DiffPrint(DEBUG_IMPORTANT, "Report: Cannot find filter (possibly deleted ?)\n"); } ReleaseLock(dr_mtx_); } void DiffusionRouting::processMessage(Message *msg) { NRSimpleAttribute *rmst_id_attr = NULL; CallbackList::iterator cbl_itr; HandleList::iterator sub_itr; NRAttrVec *callback_attrs; HandleEntry *entry; CallbackEntry *aux; CallbackList cbl; // First, acquire the lock GetLock(dr_mtx_); for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){ entry = *sub_itr; if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_))) if (entry->cb_){ aux = new CallbackEntry(entry->cb_, entry->hdl_); cbl.push_back(aux); } } // We can release the lock now ReleaseLock(dr_mtx_); // Check for RMST id attribute rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_); cbl_itr = cbl.begin(); // Process RMST fragment if we have callbacks and this message has an RmstId if (rmst_id_attr && (cbl_itr != cbl.end())){ if (!processRmst(msg)){ cbl.clear(); return; } } // Now we just call all callback functions for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){ // Copy attributes callback_attrs = CopyAttrs(msg->msg_attr_vec_); // Call app-specific callback function aux = *cbl_itr; aux->cb_->recv(callback_attrs, aux->subscription_handle_); delete aux; // Clean up callback attributes ClearAttrs(callback_attrs); delete callback_attrs; } // We are done cbl.clear(); } bool DiffusionRouting::processRmst(Message *msg) { NRSimpleAttribute *data_buf_attr = NULL; NRSimpleAttribute *max_frag_attr = NULL; NRSimpleAttribute *rmst_id_attr = NULL; NRSimpleAttribute *frag_attr = NULL; int rmst_no, frag_no, data_buf_len, count; void *blob_ptr, *tmp_frag_ptr; Int2RecRmst::iterator rmst_iterator; Int2Frag::iterator frag_iterator; char *dstPtr; int dstSize; RecRmst *rmst_ptr; // Read Rmst attributes data_buf_attr = RmstDataAttr.find(msg->msg_attr_vec_); rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_); frag_attr = RmstFragAttr.find(msg->msg_attr_vec_); rmst_no = rmst_id_attr->getVal(); frag_no = frag_attr->getVal(); blob_ptr = data_buf_attr->getVal(); data_buf_len = data_buf_attr->getLen(); // See if we are receiving this blob, if not start a new RecRmst rmst_iterator = rec_rmst_map_.find(rmst_no); if (rmst_iterator == rec_rmst_map_.end()){ rmst_ptr = new RecRmst(rmst_no); rec_rmst_map_.insert(Int2RecRmst::value_type(rmst_no, rmst_ptr)); } else rmst_ptr = (*rmst_iterator).second; if (frag_no == 0){ max_frag_attr = RmstMaxFragAttr.find(msg->msg_attr_vec_); rmst_ptr->max_frag_ = max_frag_attr->getVal(); rmst_ptr->mtu_len_ = data_buf_len; } // Copy fragment to map tmp_frag_ptr = new char[data_buf_len]; memcpy(tmp_frag_ptr, blob_ptr, data_buf_len); rmst_ptr->frag_map_.insert(Int2Frag::value_type(frag_no, tmp_frag_ptr)); if (frag_no == rmst_ptr->max_frag_) rmst_ptr->max_frag_len_ = data_buf_len; count = rmst_ptr->frag_map_.size(); // If this is the last rmst fragment, create the entire rmst if (count == (rmst_ptr->max_frag_ + 1)){ DiffPrint(DEBUG_DETAILS, "RMST #%d is complete, creating big blob !\n", rmst_no); // Allocate memory for the big blob dstSize = rmst_ptr->max_frag_ * rmst_ptr->mtu_len_ + rmst_ptr->max_frag_len_; dstPtr = new char[dstSize]; // Copy all but last fragment to a buffer for (int i = 0; i < rmst_ptr->max_frag_; i++){ frag_iterator = rmst_ptr->frag_map_.find(i); tmp_frag_ptr = (*frag_iterator).second; memcpy((void *)&dstPtr[i * rmst_ptr->mtu_len_], (void *)tmp_frag_ptr, rmst_ptr->mtu_len_); } // Now, copy the last fragment to the buffer frag_iterator = rmst_ptr->frag_map_.find(rmst_ptr->max_frag_); tmp_frag_ptr = (*frag_iterator).second; memcpy((void *)&dstPtr[rmst_ptr->max_frag_ * rmst_ptr->mtu_len_], (void *)tmp_frag_ptr, rmst_ptr->max_frag_len_); // Since we copied everything from the map - clean it up rec_rmst_map_.erase(rmst_iterator); delete rmst_ptr; // Now we substitute the last fragment with the reconstructed blob data_buf_attr->setVal(dstPtr, dstSize); // Deliver this to the application return true; } // We don't have the entire blob return false; } HandleEntry * DiffusionRouting::removeHandle(handle my_handle, HandleList *hl) { HandleList::iterator itr; HandleEntry *entry; for (itr = hl->begin(); itr != hl->end(); ++itr){ entry = *itr; if (entry->hdl_ == my_handle){ hl->erase(itr); return entry; } } return NULL; } HandleEntry * DiffusionRouting::findHandle(handle my_handle, HandleList *hl) { HandleList::iterator itr; HandleEntry *entry; for (itr = hl->begin(); itr != hl->end(); ++itr){ entry = *itr; if (entry->hdl_ == my_handle) return entry; } return NULL; } FilterEntry * DiffusionRouting::deleteFilter(handle my_handle) { FilterList::iterator itr; FilterEntry *entry; for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){ entry = *itr; if (entry->handle_ == my_handle){ filter_list_.erase(itr); return entry; } } return NULL; } FilterEntry * DiffusionRouting::findFilter(handle my_handle) { FilterList::iterator itr; FilterEntry *entry; for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){ entry = *itr; if (entry->handle_ == my_handle) return entry; } return NULL; } bool DiffusionRouting::hasScope(NRAttrVec *attrs) { NRAttribute *temp = NULL; temp = NRScopeAttr.find(attrs); if (temp) return true; return false; } bool DiffusionRouting::checkSubscription(NRAttrVec *attrs) { NRSimpleAttribute *nrclass = NULL; NRSimpleAttribute *nrscope = NULL; // We first try to locate both class and scope attributes nrclass = NRClassAttr.find(attrs); nrscope = NRScopeAttr.find(attrs); // There must be a class attribute in subscriptions if (!nrclass) return false; if (nrscope){ // This subcription has both class and scope attribute. So, we // check if class/scope attributes comply with the Diffusion // Routing API // Must check scope's operator. The API requires it to be "IS" if (nrscope->getOp() != NRAttribute::IS) return false; // Ok, so first check if this is a global subscription if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) && (nrclass->getVal() == NRAttribute::INTEREST_CLASS) && (nrclass->getOp() == NRAttribute::IS)) return true; // Check for local subscriptions if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) return true; // Just to be sure we did not miss any case return false; } // If there is no scope attribute, we will insert one later if this // subscription looks like a global subscription if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) && (nrclass->getOp() == NRAttribute::IS)) return true; return false; } bool DiffusionRouting::checkPublication(NRAttrVec *attrs) { NRSimpleAttribute *nrclass = NULL; NRSimpleAttribute *nrscope = NULL; // We first try to locate both class and scope attributes nrclass = NRClassAttr.find(attrs); nrscope = NRScopeAttr.find(attrs); // There must be a class attribute in the publication if (!nrclass) return false; // In addition, the Diffusion Routing API requires the class // attribute to be set to "IS DATA_CLASS" if ((nrclass->getVal() != NRAttribute::DATA_CLASS) || (nrclass->getOp() != NRAttribute::IS)) return false; if (nrscope){ // Ok, so this publication has both class and scope attributes. We // now have to check if they comply to the Diffusion Routing API // semantics for publish // Must check scope's operator. The API requires it to be "IS" if (nrscope->getOp() != NRAttribute::IS) return false; // We accept both global and local scope data messages if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) || (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)) return true; // Just not to miss any case return false; } // A publish without a scope attribute is fine, we will include a // default NODE_LOCAL_SCOPE attribute later return true; } bool DiffusionRouting::checkSend(NRAttrVec *attrs) { NRSimpleAttribute *nrclass = NULL; NRSimpleAttribute *nrscope = NULL; // Currently only checks for Class and Scope attributes nrclass = NRClassAttr.find(attrs); nrscope = NRScopeAttr.find(attrs); if (nrclass || nrscope) return false; return true; } bool DiffusionRouting::isPushData(NRAttrVec *attrs) { NRSimpleAttribute *nrclass = NULL; NRSimpleAttribute *nrscope = NULL; // Currently only checks for Class and Scope attributes nrclass = NRClassAttr.find(attrs); nrscope = NRScopeAttr.find(attrs); // We should have both class and scope if (nrclass && nrscope){ if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) return false; return true; } else{ DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !\n"); return false; } }