/* $Id: mcl_rx.cpp,v 1.17 2003/11/12 09:24:36 roca Exp $ */ /* * Copyright (c) 1999-2003 INRIA - Universite Paris 6 - All rights reserved * (main author: Vincent Roca - vincent.roca@inrialpes.fr) * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, * USA. */ #include "mcl_includes.h" /* * private functions */ static int mcl_process_sig (mclcb_t *mclcb, int type, hdr_infos_t *hdr_infos, mcl_addr *saddr); #ifdef RSE_FEC static void mcl_decode_all_adu (mclcb_t *mclcb); static void mcl_decode_this_adu (mclcb_t *mclcb, adu_t *adu); #endif /* RSE_FEC */ /* * Reception thread polling data regularly on the various mcast groups */ void* mcl_rx_thread (void *arg) { mclcb_t *mclcb = (mclcb_t*)arg; /* * we don't sleep here but in the mcl_recv_pkt func. * the unlock will be done there... */ ASSERT(mclcb != NULL); TRACELVL(5, (mcl_stdout, "-> mcl_rx_thread:\n")) #ifdef WIN32 if(mclcb->test_cancel) { ExitThread(0); } #else pthread_setcancelstate(PTHREAD_CANCEL_ENABLE,NULL); /* * cancellation is deferred till next check point , i.e. points * in code where we know everything is in a stable state */ pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED,NULL); pthread_testcancel(); #endif mcl_lock(&mcl_mutex_lock); while (1) { if (mcl_fsm_closed(mclcb, RECEIVER)) { /* everything is finished */ break; } if (mclcb->verbose >= 3) { PRINT_OUT((mcl_stdout, "Rx Thread %ld, ses_id=%d, time %d, state %s\n", (ulong)mclcb->rx_thread, mclcb->id, mcl_time_count, mcl_fsm_print_state(mclcb, RECEIVER))) } /*if (mcl_fsm_close_already_rx(mclcb))*/ if (mcl_fsm_no_new_undup_du(mclcb)) { /* * we received all DUs, we now wait the application * finishes to read all data and issue a mcl_close. */ mcl_unlock(&mcl_mutex_lock); #ifdef WIN32 if(mclcb->test_cancel) { ExitThread(0); } #else /* NOT WIN32 */ pthread_testcancel(); #endif mcl_usleep(DFLT_POLLING_PERIOD); #ifdef WIN32 if(mclcb->test_cancel) { ExitThread(0); } #else /* NOT WIN32 */ pthread_testcancel(); #endif mcl_lock(&mcl_mutex_lock); } else { /* get new packets; mcl_process_pkt called for each one */ mcl_recv_pkt(mclcb); } } TRACELVL(5, (mcl_stdout, "<- mcl_rx_thread:\n")) mcl_unlock(&mcl_mutex_lock); arg = 0; #ifdef WIN32 ExitThread(0); #else pthread_exit(arg); #endif return arg; /* unused */ } /* * Get the packet received... * This can be a pure signaling packet or a pure data packet, or * a data packet plus a signaling header (usual case). */ void mcl_process_pkt (mclcb_t* mclcb, mcl_rx_pkt *pkt, /* recv'd pkt */ mcl_addr *saddr) /* src addr */ { hdr_infos_t hdr_infos; /* info extracted from the LCT header */ du_t *du; /* DU descriptor of the data buf */ block_t *blk; /* block to which the DU belongs */ int layer; int len; /* data len */ adu_t *adu; /* ADU to which the DU belongs */ int hlen = 0; /* total header length */ int ret = 0; TRACELVL(5, (mcl_stdout, "-> mcl_process_pkt:\n")) ASSERT(pkt && pkt->pkt_len > 0); ASSERT(saddr && saddr->get_port() > 0); TRACELVL(4, (mcl_stdout, "mcl_process_pkt: ses_id=%d, from %s/%d, len=%d\n", mclcb->id, saddr->get_addr_string(), saddr->get_port(), pkt->pkt_len)) if (mcl_fsm_closed(mclcb, RECEIVER)) { /* * the session has been closed in the meanwhile... ignore */ TRACELVL(5, (mcl_stdout, "<- mcl_process_pkt: ignored (session closed)\n")) delete pkt; return; } if (mclcb->verbose >= 4) { /* dump at most 16 32-bit words */ mcl_dump_buffer(pkt->get_buf(), pkt->pkt_len, 16); } /* * process the LCT header (extension then fixed header) first */ memset(&hdr_infos, 0, sizeof(hdr_infos)); hdr_infos.FPI_present = hdr_infos.FTI_present = hdr_infos.NONEWADU_present = false; if (pkt->pkt_len < (int)(sizeof(fixed_lct_hdr_t))) { PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR too short %d\n", pkt->pkt_len)) goto bad_hdr; } if ((hlen = alc_hdr_parse(mclcb, (fixed_lct_hdr_t*)pkt->get_buf(), &hdr_infos, pkt->pkt_len)) < 0) { PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR, bad LCT header\n")) goto bad_hdr; } /* * right session ? check the {source_addr; TSI} */ if ((int)hdr_infos.demux_label != mclcb->demux_label || (mclcb->check_src_addr && mclcb->src_addr.addr_is_equal(*saddr) == false)) { TRACELVL(4, (mcl_stdout, " mcl_process_pkt: pkt for bad session (expected %s/%d, got %s/%d)\n", mclcb->src_addr.get_addr_string(), mclcb->demux_label, saddr->get_addr_string(), hdr_infos.demux_label)) mclcb->stats.bad_demux_label++; goto bad; } /* * process the congestion control header then */ if (mclcb->no_congestion_control) { /* * In no_congestion_control mode there is no CC header. * Required for FLUTE interoperability tests, but a limitation * is that no loss statistics are possible. */ layer = 0; } else { /* process the congestion header */ #ifdef RLC layer = rlc_rx_analyze_packet(mclcb, (rlc_hdr_t*)(pkt->get_buf() + 4)); #elif defined(FLIDS) layer = FLIDs_rx_AnalyzePacket(mclcb, (flids_hdr_t*)(pkt->get_buf() + 4)); #else /* Error, RLC or FLIDs (or another CC scheme) must be defined */ exit(-1); /* fatal */ #endif /* RLC */ if (layer == ERR_CORRUPT_HDR) { PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR, bad RLC/FLIDS header\n")) goto bad_hdr; } if (layer == ERR_BAD_LAYER) { if (!mclcb->addr.is_multicast_addr()) { /* * With unicast and DSS simulate a subscription to * layer nb_level * With unicast and LCT simulate a subscription * to layer [0; nb_level[ */ TRACELVL(5, (mcl_stdout, "<- mcl_process_pkt: ignored (simulate layering in unicast)\n")) delete pkt; return; } PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR, bad layer\n")) goto bad_hdr; } } /* * process the multiple signaling headers if any */ if (hdr_infos.FTI_present) { /* FEC Object Transmission Information (FTI) HE present */ ret = mcl_process_sig (mclcb, EXT_FTI, &hdr_infos, saddr); } if (hdr_infos.FDT_present) { /* FLUTE's File Delivery Table (FDT) HE present */ ret = mcl_process_sig (mclcb, EXT_FDT, &hdr_infos, saddr); } if (ret >= 0 && hdr_infos.NONEWADU_present) { /* LCT equivalent to NONEWADU */ if (!mcl_fsm_no_new_adu(mclcb, RECEIVER)) { ret = mcl_process_sig (mclcb, SIG_NONEWADU, &hdr_infos, saddr); } /* else ignore (this is a duplicated announce) */ } if (ret >= 0 && hdr_infos.close > 0) { /* LCT equivalent to CLOSE */ ret = mcl_process_sig (mclcb, SIG_CLOSE, &hdr_infos, saddr); } if (ret < 0) { PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR in hdr processing\n")) goto bad_hdr; } if ((ret == 1) && !(hdr_infos.close > 0) && !(hdr_infos.NONEWADU_present)) {/* FLUTE_DELIVERY do not store adu & paket*/ TRACELVL(5, (mcl_stdout,"<- mcl_process_pkt: ignored (FLUTE_DELIVERY)\n")) delete pkt; return; } mclcb->stats.rx_totbytes += hlen; /* data len incr later; see below...*/ /* * check what we got... */ len = pkt->pkt_len - hlen; /* data len */ ASSERT(len >= 0); if (len == 0) { /* not a data packet... nothing else to do */ delete pkt; /* no more necessary */ TRACELVL(5, (mcl_stdout, "<- mcl_process_pkt: only SIG\n")) return; } if (hdr_infos.FPI_present == false) { /* error, data packet without any payload info */ PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR, data packet without any payload info\n")) goto bad_hdr; } if (mclcb->verbose >= 1) PrintRcvdDU(mclcb, 0, layer, &hdr_infos); /* * create its descriptor */ du = CreateDU(mclcb); du->seq = hdr_infos.idf_du; du->len = len; du->data = pkt->get_buf() + hlen; du->pkt = pkt; #ifdef FEC du->is_fec = hdr_infos.is_fec; #endif /* * update stats now (includes duplicated packets) */ mclcb->stats.rx_pkts_per_lvl[layer]++; mclcb->stats.rx_bytes_per_lvl[layer] += len; #ifdef FEC if (du->is_fec) { mclcb->stats.rx_fec_pkts++; mclcb->stats.rx_fec_bytes += len; } else #endif /* FEC */ { mclcb->stats.rx_pkts++; mclcb->stats.rx_bytes += len; } mclcb->stats.rx_totbytes += len;/* sig hlen already incr; see above...*/ /* * find the right ADU first... */ if (!(adu = FindADU(mclcb, hdr_infos.idf_adu, hdr_infos.FDT_instanceid, mclcb->rxlvl.adu_head))) { /* * orphan du (we received a DU before the announcement * of its ADU). Should put it in mclcb->rxlvl.du_head... TO DO. */ PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR1, orphan DU [%d/%d/%d] (no adu)\n", hdr_infos.idf_adu, hdr_infos.idf_block, du->seq)) free(du); goto bad; } if (adu->rx_status >= ADU_STATUS_DECODED) { /* useless du as the ADU is already decoded and/or delivered */ TRACELVL(3, (mcl_stdout, "mcl_process_pkt: duplicated0!!!\n")) goto duplicated; } /* * ... and then the right block... */ if (!(blk = FindBlock(mclcb, hdr_infos.idf_block, adu->block_head, adu))) { /* * orphan du (we received a DU before the announcement * of its ADU). Should put it in mclcb->rxlvl.du_head... TO DO. */ PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR2, orphan DU [%d/%d/%d] (no block)\n", hdr_infos.idf_adu, hdr_infos.idf_block, du->seq)) free(du); goto bad; } if (blk->rx_status == BLK_STATUS_DECODED) { /* useless du as the block is already decoded */ TRACELVL(3, (mcl_stdout, "mcl_process_pkt: duplicated1!!!\n")) goto duplicated; } #ifdef FEC if (blk->rx_status == BLK_STATUS_COMPLETED) { if (du->is_fec) { /* can't do anything interesting with it */ /* useless du as block already completed */ TRACELVL(3, (mcl_stdout, "mcl_process_pkt: duplicated2!!!\n")) goto duplicated; } else { /* it is possible that this data DU can replace a * FEC du. Depends if DU has already been received * or not! * If possible, do it as FEC decoding is simplified. */ } } #endif /* FEC */ du->block = blk; ASSERT(du->block->adu == adu); /* * ... and insert it */ if (du->is_fec) { if (InsertDU(mclcb, du, &(blk->fec_du_head)) <= 0) { /* duplicated FEC du */ TRACELVL(3, (mcl_stdout, "mcl_process_pkt: duplicated3 fec!!!\n")) goto duplicated; } blk->fec_du_nb_in_list++; } else { if (InsertDU(mclcb, du, &(blk->du_head)) <= 0) { /* duplicated du */ TRACELVL(3, (mcl_stdout, "mcl_process_pkt: duplicated3 data!!!\n")) goto duplicated; } blk->du_rx++; #ifdef RSE_FEC if (adu->fec_encoding_id == FEC_ENCODING_ID_SMALL_LARGE_EXP_FEC && adu->fec_instance_id == (fec_instance_id)FEC_INSTANCE_ID_RSE && blk->rx_status == BLK_STATUS_COMPLETED) { /* * this data DU can indeed replace a FEC du * in case of RSE */ #ifdef VIRTUAL_RX_MEM if (mcl_vrm_can_store_in_vrm(mclcb, du->len)) { /* use the VRMEM service to register data */ if (mcl_vrm_store_data(mclcb, du, du->data, du->len)) { PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR: Virtual Rx Memory service failed\n")) goto bad; } /* pkt no longer required... free it! */ delete pkt; } else if (mclcb->vrm_used) { /* remember it kept in physical memory */ mcl_vrm_register_in_prm(mclcb, du, du->len); } #endif /* VIRTUAL_RX_MEM */ /* TODO: free a fec du immediately! */ TRACELVL(3, (mcl_stdout, "mcl_process_pkt: duplicated4!!!\n")) goto duplicated_nofree; /* to update dupl stats */ } #endif /* RSE_FEC */ } #ifdef VIRTUAL_RX_MEM if (mcl_vrm_can_store_in_vrm(mclcb, du->len)) { /* * use the VRMEM service to register data */ if (mcl_vrm_store_data(mclcb, du, du->data, du->len)) { PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR: Virtual Rx Memory service failed\n")) goto bad; } /* pkt no longer required... free it! */ delete pkt; } else if (mclcb->vrm_used) { /* remember it kept in physical memory */ mcl_vrm_register_in_prm(mclcb, du, du->len); } #endif /* VIRTUAL_RX_MEM */ /* * update stats now (non-duplicated packets only) */ mclcb->stats.rx_undup_pkts_per_lvl[layer]++; mclcb->stats.buf_space += len; if (mclcb->stats.buf_space > mclcb->stats.max_buf_space) mclcb->stats.max_buf_space = mclcb->stats.buf_space; /* * perform FEC decoding now. * The codec used is specified by the * {fec_encoding_id; fec_instance_id} fields. */ switch (adu->fec_encoding_id) { case FEC_ENCODING_ID_SMALL_LARGE_EXP_FEC: switch (adu->fec_instance_id) { #ifdef RSE_FEC case FEC_INSTANCE_ID_RSE: /* * was du the last DU of a block/ADU ? */ if (mcl_rx_enough_du(mclcb, blk)) { if (mclcb->postpone_fec_decoding) { /* ok, we rx enough data or FEC DUs, * just mark it */ blk->rx_status = BLK_STATUS_COMPLETED; } else { /* ok, we rx enough data or FEC DUs, * so decode */ mclcb->fec.decode(mclcb, blk); } } break; #endif /* RSE_FEC */ default: PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR: unsupported FEC Instance ID %d for FEC Encoding ID %d\n", adu->fec_instance_id, adu->fec_encoding_id)) mcl_exit(-1); //goto bad; } break; #ifdef LDPC_FEC case FEC_ENCODING_ID_LDPC_FEC: switch (adu->fec_instance_id) { case FEC_INSTANCE_ID_LDGM: case FEC_INSTANCE_ID_LDPC: /* * iterative decoding * takes place for all incoming packet... */ mclcb->fec.decode(mclcb, du); break; default: PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR: unsupported FEC Instance ID %d for FEC Encoding ID %d\n", adu->fec_instance_id, adu->fec_encoding_id)) mcl_exit(-1); //goto bad; } break; #endif case FEC_ENCODING_ID_NO_FEC: /* * was du the last DU of a block/ADU ? */ if (mcl_rx_enough_du(mclcb, blk)) { blk->rx_status = BLK_STATUS_DECODED; } break; default: PRINT_ERR((mcl_stderr, "mcl_process_pkt: ERROR: unsupported fec_encoding_id %d\n", hdr_infos.fec_encoding_id)) mcl_exit(-1); //goto bad; } /* * see if an ADU has been completely received, and in that * case if some data can be delivered to the application. */ if (mcl_rx_new_completed_adu(mclcb, adu)) { if (mclcb->verbose >= 1) PRINT_OUT((mcl_stdout, "End of ADU %d\n", adu->seq)) if (mclcb->statistics == 2) mcl_print_rx_stats(mclcb); #ifdef RSE_FEC if (!mclcb->postpone_fec_decoding && adu->rx_status == ADU_STATUS_COMPLETED) { /* do not wait, decode every block of this adu */ mcl_decode_this_adu (mclcb, adu); } #endif /* RSE_FEC */ mclcb->ready_data++; /* remember it */ if (mcl_fsm_no_new_adu(mclcb, RECEIVER) && mcl_rx_all_adu_completed(mclcb,mclcb->rxlvl.adu_head) > 0) { /* * we know we won't receive any new ADU and * it was the last packet we were waiting for... */ if (mclcb->verbose >= 1) PRINT_OUT((mcl_stdout, "All ADUs received\n")) mcl_fsm_update_state(mclcb, RECEIVER, TEVENT_NIL, REVENT_ALL_DU_RECV); if (mclcb->statistics >= 1) { mcl_print_rx_stats(mclcb); mcl_print_final_stats(mclcb); } /* * unsubscribe to all layers (incl. layer 0) to avoid * receiving useless packets */ mcl_drop_layer(mclcb, MCL_ALL_LAYERS, MCL_DO_IT); #ifdef RSE_FEC /* * we can now decode all ADUs if in postpone mode */ if (mclcb->postpone_fec_decoding) { mcl_decode_all_adu(mclcb); } #endif /* RSE_FEC */ } } TRACELVL(5, (mcl_stdout, "<- mcl_process_pkt:\n")) return; duplicated: mclcb->stats.rx_dupl_pkts ++; mclcb->stats.rx_dupl_bytes += du->len; /* do it before the free(du)! */ delete pkt; free(du); TRACELVL(5, (mcl_stdout, "<- mcl_process_pkt: duplicated1\n")) return; #ifdef RSE_FEC duplicated_nofree: mclcb->stats.rx_dupl_pkts ++; mclcb->stats.rx_dupl_bytes += du->len; TRACELVL(5, (mcl_stdout, "<- mcl_process_pkt: duplicated 2\n")) return; #endif /* RSE_FEC */ bad_hdr: mclcb->stats.bad_hdr++; bad: mclcb->stats.errors++; delete pkt; TRACELVL(5, (mcl_stdout, "<- mcl_process_pkt: ERROR\n")) return; } /* * Process the various types of signalling */ static int mcl_process_sig (mclcb_t *mclcb, int type, /* sig type */ hdr_infos_t *hdr_infos, mcl_addr *saddr) /* src addr or NULL */ { adu_t *adu; u_int seq; #ifdef LDPC_FEC double fec_ratio; /* n/k value, constant for all blocks, even */ /* if the last blk may be shorter (k smaller) */ #endif // ASSERT(mcl_iss > 0); /* 0 means nothing so iss must be > 0 */ ASSERT(!mcl_fsm_closed(mclcb, RECEIVER)); switch (type) { case EXT_FDT: break; case EXT_FTI: { block_t *blk; u_int rem; TRACELVL(5, (mcl_stdout, "-> mcl_process_sig: EXT_FTI (%d)\n", type)) adu = FindADU(mclcb, hdr_infos->idf_adu,hdr_infos->FDT_instanceid, mclcb->rxlvl.adu_head); if (adu && !(adu->ia_adu)) { /* * ADU already announced and completed! * NB: to counter losses, new ADUs are * announced in each data packet (EXT_FTI) */ break; } if (mclcb->verbose >= 1) { PRINT_OUT((mcl_stdout, "New ADU: seq=%d, len=%d, max_k=%d, symbol_size=%d\n", hdr_infos->idf_adu, hdr_infos->adu_len, hdr_infos->k, hdr_infos->symbol_len)) } if (!adu) { /* * completely new ADU, update state and * allocate the struct... */ mcl_fsm_update_state(mclcb, RECEIVER, TEVENT_NIL, REVENT_NEW_ADU); /* * for FLUTE */ if ((mclcb->immediate_delivery != 3) || ((mclcb->immediate_delivery == 3) && (requestedtoi_flute(mclcb, hdr_infos->idf_adu) || (mclcb->deliverallADU == 1) || (hdr_infos->idf_adu == 0)))) { adu = CreateADU(mclcb); adu->seq = hdr_infos->idf_adu; adu->FDTinstanceID = hdr_infos->FDT_instanceid; InsertADU(mclcb, adu, &(mclcb->rxlvl.adu_head)); mclcb->stats.adus_announced++; } else { /* do not store the adu */ return 1; } } else { /* no longer implicite */ adu->ia_adu = 0; } /* FEC codec used for this ADU */ adu->fec_encoding_id = hdr_infos->fec_encoding_id; adu->fec_instance_id = hdr_infos->fec_instance_id; adu->len = hdr_infos->adu_len; /* store src addr for mcl_recvfrom() */ adu->addr = *saddr; adu->symbol_len = hdr_infos->symbol_len; #if 0 if (hdr_infos->symbol_len <= mclcb->payload_size) // ????? mclcb->payload_size = hdr_infos->symbol_len; // ????? #endif /* * calculate the number of DUs for each block and * allocate the block structs... * do it simply: allocate a tab of block_t structs * rather than a linked list! */ adu->block_nb = (int)ceil((double)adu->len / ((double)hdr_infos->k * (double)adu->symbol_len)); if (!(blk = (block_t*)calloc(adu->block_nb, sizeof(block_t)))) { PRINT_ERR((mcl_stderr, "mcl_process_sig: ERROR, no memory\n")) mcl_exit(-1); } adu->block_head = blk; rem = adu->len; for (seq = 0; seq < adu->block_nb; seq++, blk++) { blk->adu = adu; blk->seq = seq; blk->len = min(rem, hdr_infos->k * adu->symbol_len); blk->k = (int)ceil((double)blk->len / (double)adu->symbol_len); #ifdef LDPC_FEC if (adu->fec_encoding_id == FEC_ENCODING_ID_LDPC_FEC) { /* * WARNING: due to the way fec_key is passed * to rx, the same value is used for all blocks * of an ADU */ ASSERT(adu->fec_instance_id == FEC_INSTANCE_ID_LDGM || adu->fec_instance_id == FEC_INSTANCE_ID_LDPC); blk->fec_key = hdr_infos->fec_key; if (seq == 0) { /* full size block */ blk->n = hdr_infos->n; fec_ratio = (double)blk->n / (double)blk->k; TRACELVL(4, (mcl_stdout, " New ADU: LDPC/LDGM key=%d, max_n=%d, fec_ratio=%.3f\n", blk->fec_key, blk->n, (float)fec_ratio)) } else { /* the last block may be shorter */ blk->n = (u_int32_t)((double)blk->k * fec_ratio); } } #endif /* LDPC_FEC */ /* * blk->du_head, fec_du_head, fec_du_nb_in_list, du_rx * already set to NULL/0 */ rem -= blk->len; } TRACELVL(4, (mcl_stdout, " New ADU: %d blocks, max_block_len=%d (%d DUs), last_block_len=%d (%d DUs)\n", adu->block_nb, adu->block_head->len, adu->block_head->k, (blk-1)->len, (blk-1)->k)) break; } case SIG_NONEWADU: TRACELVL(5, (mcl_stdout, "-> mcl_process_sig: SIG_NONEWADU (%d)\n", type)) /* this sig header must not be a duplicate */ ASSERT(!mcl_fsm_no_new_adu(mclcb, RECEIVER)); ASSERT(hdr_infos->max_idf_adu >= mcl_iss); if (mclcb->verbose >= 1) PrintRcvdDU(mclcb, EH_SIG, type, hdr_infos); mcl_fsm_update_state(mclcb, RECEIVER, TEVENT_NIL, REVENT_NO_NEW_ADU); /* * check if there are implicit declarations of NEWADU, * i.e. ADUs that we know exist thanks to the max_adu_idf * info of NONEWADU, but for which we did not receive any * packet yet... */ /*for (seq = hdr_infos->max_idf_adu; seq >= mcl_iss ; seq--) */ /* more efficient in ascending ADU seq number order */ for (seq = mcl_iss; seq <= hdr_infos->max_idf_adu; seq++) { adu = FindADU(mclcb, seq,0 , mclcb->rxlvl.adu_head); if (!adu) { adu = CreateADU(mclcb); mclcb->stats.adus_announced++; adu->seq = seq; adu->FDTinstanceID = 0; adu->ia_adu = 1; /* implicitely announced */ InsertADU(mclcb, adu, &(mclcb->rxlvl.adu_head)); } } /* * did we receive everything ? */ if (mcl_rx_all_adu_completed(mclcb,mclcb->rxlvl.adu_head) > 0) { if (mclcb->verbose >= 1) PRINT_OUT((mcl_stdout, "\nAll ADUs received\n")) /* * the receiver has finished */ mcl_fsm_update_state(mclcb, RECEIVER, TEVENT_NIL, REVENT_ALL_DU_RECV); if (mclcb->statistics >= 1) { mcl_print_rx_stats(mclcb); mcl_print_final_stats(mclcb); } /* * unsubscribe to all layers (incl. layer 0) to avoid * receiving useless packets */ mcl_drop_layer(mclcb, MCL_ALL_LAYERS, MCL_DO_IT); #ifdef RSE_FEC /* * we can now decode all ADUs if in postpone mode */ if (mclcb->postpone_fec_decoding) { mcl_decode_all_adu(mclcb); } #endif /* RSE_FEC */ } /* otherwise wait... */ break; case SIG_CLOSE: TRACELVL(5, (mcl_stdout, "-> mcl_process_sig: SIG_CLOSE (%d)\n", type)) if (mcl_fsm_close_already_rx(mclcb)) { /* this is a duplicated announce; ignore */ break; } if (mclcb->verbose >= 2) PrintRcvdDU(mclcb, EH_SIG, type, hdr_infos); mcl_fsm_update_state(mclcb, RECEIVER, TEVENT_NIL, REVENT_CLOSE_RECV); ASSERT(mcl_fsm_no_new_adu(mclcb, RECEIVER)); if (mclcb->statistics >= 1) { mcl_print_rx_stats(mclcb); mcl_print_final_stats(mclcb); } break; default: PRINT_ERR((mcl_stderr, "mcl_process_sig: ERROR, unknown SIG type: %d\n", type)) return -1; } /* update stats */ //mclcb->stats.rx_sig_pkts++; /* there is a SIG header */ TRACELVL(5, (mcl_stdout, "<- mcl_process_sig: ok\n")) return 0; } #ifdef RSE_FEC /* { */ /* * we have just finished to receive everything. Now make sure that * all blocks of all ADUs are actually decoded. * Used in postpone_fec_decoding mode after having received everything. * Specific to RSE ! */ static void mcl_decode_all_adu (mclcb_t *mclcb) { adu_t *adu; TRACELVL(5, (mcl_stdout, "-> mcl_decode_all_adu:\n")) ASSERT(mclcb->postpone_fec_decoding); adu = mclcb->rxlvl.adu_head; do { ASSERT(adu); adu = adu->prev; if (adu->fec_encoding_id == FEC_ENCODING_ID_SMALL_LARGE_EXP_FEC && adu->fec_instance_id == FEC_INSTANCE_ID_RSE) { /* postponed decoding only applies to RSE */ if (adu->rx_status < ADU_STATUS_DECODED) { ASSERT(adu->rx_status == ADU_STATUS_COMPLETED); mcl_decode_this_adu(mclcb, adu); } /* else nothing to do */ } /* nothing to do with other FEC codes */ } while (adu != mclcb->rxlvl.adu_head); TRACELVL(5, (mcl_stdout, "<- mcl_decode_all_adu: ok\n")) } /* * Specific to RSE ! */ static void mcl_decode_this_adu (mclcb_t *mclcb, adu_t *adu) { block_t *blk; int i; ASSERT(adu->rx_status == ADU_STATUS_COMPLETED); ASSERT(adu->fec_encoding_id == FEC_ENCODING_ID_SMALL_LARGE_EXP_FEC); ASSERT(adu->fec_instance_id == FEC_INSTANCE_ID_RSE); for (i = adu->block_nb, blk = adu->block_head; i > 0; i--, blk++) { if (blk->rx_status == BLK_STATUS_DECODED) continue; if (mclcb->fec.decode(mclcb, blk) < 0) { PRINT_ERR((mcl_stderr, "mcl_decode_this_adu: ERROR, decode() failed\n")) mcl_exit(-1); } } #ifdef GET_SYSINFO mcl_print_sysinfo(mclcb); #endif adu->rx_status = ADU_STATUS_DECODED; } #endif /* } RSE_FEC */ /* * Try to return an ADU to the appli. * Maybe there is a gap and this is not possible... * * Return the amount of data copied to userbuf, 0 if some * data is available but not copied to userbuf, and < 0 if * no data is available. */ int mcl_return_adu_to_appli (mclcb_t *mclcb, char *userbuf, /* user buffer */ u_int userlen, /* user buffer length, or 0 to check */ /* if an ADU is available */ struct sockaddr *saddr, /* buf for src addr or NULL */ int *saddr_len) /* buf for src addr len or 0 */ { u_int seq; /* expected adu seq # in ORDERED_DEL. */ adu_t *adu; /* expected adu to give to appli */ block_t *blk; du_t *du; du_t *ndu; /* next du */ char *dst; /* where to do data copy in userbuf */ int i, j; u_int len, rem; /* remaining data for copy to userbuf */ TRACELVL(5, (mcl_stdout, "-> mcl_return_adu_to_appli: userbuf=x%x, len=%d\n", (int)userbuf, userlen)) #if 0 { static int m = 0; /* just for profiling !!! */ if (m++ == 1000) mcl_exit(2); } #endif ASSERT(mclcb->immediate_delivery == 0 || mclcb->immediate_delivery == 1); seq = mclcb->rxlvl.next_adu2give_seq; if ((seq == 0 && mclcb->immediate_delivery==0) || mclcb->rxlvl.adu_head == NULL) { /* nothing ready */ TRACELVL(5, (mcl_stdout, "<- mcl_return_adu_to_appli: no adu received\n")) return -1; } ASSERT(mclcb->rxlvl.adu_head && mclcb->rxlvl.adu_head->prev); /* * find a ready adu first... * search the list in ADU order (preferable for disk access reasons) * * distinguish IMMEDIATE_DELIVERY and ORDERED_DELIVERY cases */ if (mclcb->immediate_delivery==1) { /* IMMEDIATE_DELIVERY */ for (adu = mclcb->rxlvl.adu_head->next; ; adu = adu->next) { if (adu->rx_status == ADU_STATUS_DECODED) { /* found a ready adu */ break; } if (adu == mclcb->rxlvl.adu_head) { /* we have cycled => no adu ready */ TRACELVL(5, (mcl_stdout, "<- mcl_return_adu_to_appli: nothing ready\n")) return -1; } } } else { /* ORDERED_DELIVERY */ if (mclcb->rxlvl.next_adu2give != NULL) { /* the next adu to return to appli is already known */ adu = mclcb->rxlvl.next_adu2give; ASSERT(adu->seq == seq); if (adu->rx_status != ADU_STATUS_DECODED) { TRACELVL(5, (mcl_stdout, "<- mcl_return_adu_to_appli: next adu not ready (state=%d)\n", adu->rx_status)) return -1; /* not ready */ } /* ready... continues after the else... */ } else { /* search for a ready adu in the list */ for (adu = mclcb->rxlvl.adu_head->next; ; adu = adu->next) { if (adu->seq == seq) { /* found the next adu */ if (adu->rx_status != ADU_STATUS_DECODED) { TRACELVL(5, (mcl_stdout, "<- mcl_return_adu_to_appli: next adu not ready 2 (state=%d)\n", adu->rx_status)) mclcb->rxlvl.next_adu2give = adu; return -1; /* not ready */ } else { break; /* ready */ /* continues after the else...*/ } } if (adu == mclcb->rxlvl.adu_head) { /* we have cycled => no adu ready */ TRACELVL(5, (mcl_stdout, "<- mcl_return_adu_to_appli: next adu not found\n")) return -1; } } } } if (userlen <= 0) { /* it was just to check if an ADU was available... */ ASSERT(userbuf == NULL); return 0; } /* * then copy data... * it requires to go through all the DUs of all the blocks of the ADU */ ASSERT(userbuf && userlen > 0); rem = userlen; dst = userbuf; for (i = adu->block_nb, blk = adu->block_head; i > 0; i--, blk++) { ASSERT(blk->fec_du_head == NULL); /* FEC already free'ed */ ASSERT(blk->fec_du_nb_in_list == 0); for (j = blk->k, du = blk->du_head; j > 0 && rem > 0; j--, du = ndu) { ASSERT(du && du->len); len = min(rem, du->len); if (len < du->len) { PRINT_ERR((mcl_stderr, "ERROR: user buffer too short (%d), %d required. Truncated\n", userlen, adu->len)) mclcb->stats.other_errors++; } #ifdef VIRTUAL_RX_MEM if (mcl_vrm_in_vrm(mclcb, du)) { mcl_vrm_get_data(mclcb, du, dst, len); } else { #endif /* VIRTUAL_RX_MEM */ memcpy(dst, du->data, len); delete du->pkt; du->pkt = NULL; /* security */ du->data = NULL; /* security */ #ifdef VIRTUAL_RX_MEM if (mclcb->vrm_used) { mcl_vrm_remove_from_prm(mclcb, du, du->len); } } #endif /* VIRTUAL_RX_MEM */ dst += len; rem -= len; ndu = du->next; mclcb->stats.buf_space -= du->len; free(du); /* XXX: write/call RemoveDU instead */ } blk->du_head = NULL; /* security */ } if (mclcb->immediate_delivery==0) { adu_t *n_adu = adu->next; /* potential next adu */ mclcb->rxlvl.next_adu2give_seq++; if (n_adu && n_adu->seq == mclcb->rxlvl.next_adu2give_seq) { mclcb->rxlvl.next_adu2give = n_adu; } else { mclcb->rxlvl.next_adu2give = NULL; } } free(adu->block_head); adu->block_head = NULL; /* security */ if (saddr) { ASSERT(saddr_len > 0); #if 0 *saddr = adu->saddr; *saddr_len = adu->saddr_len; #endif adu->addr.get_addr_struct((struct sockaddr_in*)saddr); *saddr_len = sizeof(struct sockaddr_in); /* XXX */ } #ifdef DEBUG if (mclcb->verbose >= 6) { /* just to check data received... will be in appli */ //mcl_dump_buffer(userbuf, adu->len, adu->len >> 2); mcl_dump_ascii_buffer(userbuf, adu->len); } #endif mclcb->ready_data--; adu->rx_status = ADU_STATUS_DELIVERED; ASSERT(mclcb->ready_data >= 0); if (mclcb->verbose == 2) { struct timeval time; time = mcl_get_tvtime(); PRINT_OUT((mcl_stdout, "\n%ld.%06ld\tadu_compl seq=%d len=%d\n", time.tv_sec, time.tv_usec, adu->seq, adu->len)) #ifdef GET_SYSINFO mcl_print_sysinfo(mclcb); #endif } TRACELVL(5, (mcl_stdout, "<- mcl_return_adu_to_appli: %d bytes returned\n", userlen - rem)) return (userlen - rem); } /* * Try to return an ADU to the appli with an appropriate TOI. * Maybe there is a gap and this is not possible... * Used for FLUTE_DELIVERY only. * * Return the amount of data copied to userbuf, 0 if some * data is available but not copied to userbuf, and < 0 if * no data is available. */ int mcl_return_adu_to_appli_flute (mclcb_t *mclcb, char *userbuf, /* user buffer */ u_int userlen, /* user buffer length, or 0 to check */ /* if an ADU is available */ struct sockaddr *saddr, /* buf for src addr or NULL */ int *saddr_len, /* buf for src addr len or 0 */ u_int32_t *toi) /* toi of the ADU (is seq.number) */ { adu_t *adu; /* expected adu to give to appli */ block_t *blk; du_t *du; du_t *ndu; /* next du */ char *dst; /* where to do data copy in userbuf */ int i, j; u_int len, rem; /* remaining data for copy to userbuf */ TRACELVL(5, (mcl_stdout, "-> mcl_return_adu_to_appli_flute: userbuf=x%x, len=%d\n", (int)userbuf, userlen)) #if 0 { static int m = 0; /* just for profiling !!! */ if (m++ == 1000) mcl_exit(2); } #endif ASSERT(mclcb->immediate_delivery == 3); /* use it with FLUTE only */ if (mclcb->rxlvl.adu_head == NULL) { /* nothing ready */ TRACELVL(5, (mcl_stdout, "<- mcl_return_adu_to_appli_flute: no adu received\n")) return -1; } ASSERT(mclcb->rxlvl.adu_head && mclcb->rxlvl.adu_head->prev); /* * find a ready adu first... * search the list in ADU order (preferable for disk access reasons) */ for (adu = mclcb->rxlvl.adu_head->next; ; adu = adu->next) { if (adu->rx_status == ADU_STATUS_DECODED && (requestedtoi_flute(mclcb, adu->seq) || ((mclcb->deliverallADU == 1) && requestedtoi_flute(mclcb, adu->seq)))) { /* found a ready adu */ break; } if (adu == mclcb->rxlvl.adu_head) { /* we have cycled => no adu ready */ TRACELVL(5, (mcl_stdout, "<- mcl_return_adu_to_appli_flute: nothing ready\n")) return -1; } } if (userlen <= 0) { /* it was just to check if an ADU was available... */ ASSERT(userbuf == NULL); return 0; } /* * then copy data... * it requires to go through all the DUs of all the blocks of the ADU */ ASSERT(userbuf && userlen > 0); rem = userlen; dst = userbuf; for (i = adu->block_nb, blk = adu->block_head; i > 0; i--, blk++) { ASSERT(blk->fec_du_head == NULL); /* FEC already free'ed */ ASSERT(blk->fec_du_nb_in_list == 0); for (j = blk->k, du = blk->du_head; j > 0 && rem > 0; j--, du = ndu) { ASSERT(du && du->len); len = min(rem, du->len); if (len < du->len) { PRINT_ERR((mcl_stderr, "ERROR: user buffer too short (%d), %d required. Truncated\n", userlen, adu->len)) mclcb->stats.other_errors++; } #ifdef VIRTUAL_RX_MEM if (mcl_vrm_in_vrm(mclcb, du)) { mcl_vrm_get_data(mclcb, du, dst, len); } else { #endif /* VIRTUAL_RX_MEM */ memcpy(dst, du->data, len); delete du->pkt; du->pkt = NULL; /* security */ du->data = NULL; /* security */ #ifdef VIRTUAL_RX_MEM if (mclcb->vrm_used) { mcl_vrm_remove_from_prm(mclcb, du, du->len); } } #endif /* VIRTUAL_RX_MEM */ dst += len; rem -= len; ndu = du->next; mclcb->stats.buf_space -= du->len; free(du); /* XXX: write/call RemoveDU instead */ } blk->du_head = NULL; /* security */ } if (mclcb->immediate_delivery==0) { adu_t *n_adu = adu->next; /* potential next adu */ mclcb->rxlvl.next_adu2give_seq++; if (n_adu && n_adu->seq == mclcb->rxlvl.next_adu2give_seq) { mclcb->rxlvl.next_adu2give = n_adu; } else { mclcb->rxlvl.next_adu2give = NULL; } } free(adu->block_head); adu->block_head = NULL; /* security */ if (saddr) { ASSERT(saddr_len > 0); #if 0 *saddr = adu->saddr; *saddr_len = adu->saddr_len; #endif adu->addr.get_addr_struct((struct sockaddr_in*)saddr); *saddr_len = sizeof(struct sockaddr_in); /* XXX */ } #ifdef DEBUG if (mclcb->verbose >= 6) { /* just to check data received... will be in appli */ mcl_dump_buffer(userbuf, adu->len, adu->len >> 2); } #endif mclcb->ready_data--; adu->rx_status = ADU_STATUS_DELIVERED; /*set toi*/ *toi=adu->seq; ASSERT(mclcb->ready_data >= 0); if (mclcb->verbose == 2) { struct timeval time; time = mcl_get_tvtime(); PRINT_OUT((mcl_stdout, "\n%ld.%06ld\tadu_compl seq=%d len=%d\n", time.tv_sec, time.tv_usec, adu->seq, adu->len)) #ifdef GET_SYSINFO mcl_print_sysinfo(mclcb); #endif } TRACELVL(5, (mcl_stdout, "<- mcl_return_adu_to_appli_flute: %d bytes returned\n", userlen - rem)) return (userlen - rem); }