/* $Id: mcl_network.cpp,v 1.7 2003/10/27 09:55:48 roca Exp $ */ /* * Copyright (c) 1999-2003 INRIA - Universite Paris 6 - All rights reserved * (main author: Vincent Roca - vincent.roca@inrialpes.fr) * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, * USA. */ /* * Network functions (lower socket interface). */ #include "mcl_includes.h" #ifdef SIMUL_LOSSES /* * Simulate packets losses randomly * Returns 0 if OK, 1 if packet should be lost... */ static int RandomLoss(mclcb_t *mclcb, int layer) { int IsLost = 0; TRACELVL(5, (mcl_stdout, "-> RandomLoss\n")) switch ( mclcb->simul_losses_state ) { case 0: /* last packet was sent OK. */ #ifdef CONSTANT_LOSS_RATIO if ((float)(random()%100) < (float)P_LOSS_WHEN_OK) #else if ((float)(random()%100) < (P_LOSS_WHEN_OK*(float)layer)) #endif { IsLost = 1; mclcb->simul_losses_state = 1; } break; case 1: /* last packet was lost */ #ifdef CONSTANT_LOSS_RATIO if ((float)(random()%100) < (float)P_LOSS_WHEN_LOSSES) #else if ((float)(random()%100) < (P_LOSS_WHEN_LOSSES*(float)layer)) #endif { IsLost = 1; } else mclcb->simul_losses_state = 0; break; default: perror("RandomLoss: unknown state"); mcl_exit(1); break; } TRACELVL(5, (mcl_stdout, "<- RandomLoss (layer:%d, IsLost:%d)\n", layer, IsLost)) if (IsLost) mclcb->stats.tx_simul_loss_lost++; else mclcb->stats.tx_simul_loss_sent++; return IsLost; } #endif /* SIMUL_LOSSES */ /* * Create and send a packet (with/without data and/or SIG) immediately * Returns 0 if OK, < 0 otherwise. */ int mcl_send_pkt (mclcb_t *mclcb, int layer, du_t *du, adu_t *adu) { block_t *blk; mcgroup_t *mg; //#define PBUF_LEN (mclcb->payload_size + MAX_ALC_HEADER_SIZE) #define PBUF_LEN (MAX_DATAGRAM_SIZE) // faster than previous version /* DU (hdr+data) cant be larger than PBUF_LEN */ char buf[PBUF_LEN]; fixed_lct_hdr_t *lct_hdr; /* fixed size LCT header */ hdr_infos_t hdr_infos; /* struct given to LCT creation funcs */ int hlen; /* total (RLC/FLIDS+LCT) header length*/ int len; int sig_nb; /* SIG # sent in this packet */ MCL_IOVEC iov[2]; /* to describe header + data */ #ifdef WIN32 struct sockaddr *msg_name; u_int32_t msg_namelen; DWORD nb_sent=0; #else struct msghdr msg; /* for the sendmsg() Socket syscall */ #endif TRACELVL(5, (mcl_stdout, "-> mcl_send_pkt: layer=%d, du=x%x, adu=x%x\n", layer, (int)du, (int)adu)) /* prepare for new sig copies */ mg = mclcb->txlvl_tab[layer].mcgroup; memset(&hdr_infos, 0, sizeof(hdr_infos)); hdr_infos.NONEWADU_present = false; /* by default */ if (du) { ASSERT(adu); /* also required in that case */ blk = du->block; ASSERT(blk); hdr_infos.idf_adu = adu->seq; hdr_infos.idf_block = blk->seq; hdr_infos.idf_du = du->seq; #ifdef FEC hdr_infos.is_fec = du->is_fec; #endif /* FEC */ if (adu->seq == 0) { /* it's the file delivery table FDT */ hdr_infos.FDT_present = true; hdr_infos.FDT_instanceid = adu->FDTinstanceID; } hdr_infos.FPI_present = true; /* add FPI (required if data) */ hdr_infos.demux_label = mclcb->demux_label; hdr_infos.fec_encoding_id = adu->fec_encoding_id; hdr_infos.fec_instance_id = adu->fec_instance_id; #if 0 /* don't add any FTI HE with RSE for packets sent on layer 3 * and above */ if (adu->fec_instance_id == FEC_INSTANCE_RSE && layer >= 3) { hdr_infos.FTI_present = false; } else { #endif hdr_infos.FTI_present = true; hdr_infos.adu_len = adu->len; /* k and symbol_len are that of the FIRST block */ hdr_infos.k = adu->block_head->k; hdr_infos.symbol_len = adu->symbol_len; #ifdef LDPC_FEC hdr_infos.fec_key = blk->fec_key; hdr_infos.n = blk->k + blk->fec_du_nb_in_list; #endif /*}*/ len = du->len; } else { hdr_infos.FDT_present = hdr_infos.FPI_present = hdr_infos.FTI_present = false; /* add some required flags, even if there is no data */ hdr_infos.demux_label= mclcb->demux_label; hdr_infos.fec_encoding_id = FEC_ENCODING_ID_NO_FEC;/* no FEC */ hdr_infos.fec_instance_id = FEC_INSTANCE_ID_NULL; /* no FEC */ len = 0; } /* * Prepar signaling header extensions to be added to the LCT header. * There is enough room for 16 bytes of additional signaling header * extensions (in particular for NONEWADU). */ CopySigReset(mclcb); sig_nb = CopySigToLCTinfos(mclcb, layer, &hdr_infos, 16); CleanupSigTab(mclcb); /* * create the ALC/LCT headers now * NB: hlen includes both ALC, LCT and RLC/FLIDS headers! */ lct_hdr = (fixed_lct_hdr_t*)buf; hlen = alc_hdr_create(mclcb, lct_hdr, (hdr_infos_t*)&hdr_infos); if (hlen < 0) goto bad; ASSERT(hlen <= MAX_ALC_HEADER_SIZE); /* make sure... */ /* * create the appropriate congestion control header */ if (mclcb->no_congestion_control) { /* * In no cogestion control mode there is no CC header. * Required for FLUTE interoperability tests, but a limitation * is that no loss statistics are possible */ lct_hdr->cci = 0; } else { #ifdef RLC if (rlc_tx_fill_header(mclcb, (rlc_hdr_t*)&(lct_hdr->cci), (u_int8_t) layer) != MCL_OK) goto bad; #elif defined(FLIDS) if (FLIDs_tx_FillHeader(mclcb, (flids_hdr_t*)&(lct_hdr->cci), (u_int8_t)layer ) != MCL_OK) goto bad; #else /* Error, RLC or FLIDs (or another CC scheme) must be defined */ exit(-1); /* fatal */ #endif /* RLC,FILDS */ } /* * update traces and statistics */ if (mclcb->verbose >= 2) { PrintSentDU(mclcb, 0, layer, &hdr_infos); if (mclcb->verbose >= 5) { /* header and data are in two diff buffers */ mcl_dump_buffer(buf, hlen, (hlen>> 2)); /*mcl_dump_buffer(du->data, len, (len >> 2));*/ } } if (len > 0) { if (hdr_infos.is_fec) { mclcb->stats.tx_fec_pkts++; mclcb->stats.tx_fec_bytes += len; } else { mclcb->stats.tx_pkts++; mclcb->stats.tx_bytes += len; } mclcb->stats.tx_pkts_per_lvl[layer]++; mclcb->stats.tx_bytes_per_lvl[layer] += len; } mclcb->stats.tx_totbytes += hlen + len; /* * and now send the packet */ MCL_IOV_BUFF(iov[0]) = buf; MCL_IOV_LEN(iov[0]) = hlen; if (len > 0) { #ifdef VIRTUAL_TX_MEM /* make sure data is in phy mem (perhaps in VTM cache) first */ mcl_vtm_get_data(mclcb, du); ASSERT(du->data); #endif /* VIRTUAL_TX_MEM */ MCL_IOV_BUFF(iov[1]) = du->data; MCL_IOV_LEN(iov[1]) = len; } #ifdef WIN32 if (adu && adu->addr_valid) { /* use this dest addr */ msg_name = (struct sockaddr*) adu->addr.get_internal_struct_addr(); msg_namelen = adu->addr.get_addr_struct_len(); } else { /* use the default dest addr */ msg_name = (struct sockaddr*) mg->addr.get_internal_struct_addr(); msg_namelen = mg->addr.get_addr_struct_len(); } #else /* UNIX */ memset(&msg, 0, sizeof(msg)); if (adu && adu->addr_valid) { /* use this dest addr */ msg.msg_name = (char*)(adu->addr.get_internal_struct_addr()); msg.msg_namelen = adu->addr.get_addr_struct_len(); } else { /* use the default dest addr */ msg.msg_name = (char*)(mg->addr.get_internal_struct_addr()); msg.msg_namelen = mg->addr.get_addr_struct_len(); } msg.msg_iov = iov; msg.msg_iovlen = (len > 0 ? 2 : 1); #endif /* OS_DEP */ #ifdef SIMUL_LOSSES if (!RandomLoss(mclcb,layer)) { #endif /* SIMUL_LOSSES */ #ifdef WIN32 if (WSASendTo(mg->priv_sock, iov, (len > 0 ? 2 : 1), &nb_sent, 0, msg_name, msg_namelen, NULL, NULL) == SOCKET_ERROR) { /* * if (mclcb->verbose == 2) * malloc_stats(); */ PRINT_ERR((mcl_stderr,"mcl_send_pkt: WSASendTo (code:%d - sock:%d)\n", WSAGetLastError(), mg->priv_sock)) mcl_exit(1); } #else /* UNIX */ if (sendmsg(mg->priv_sock, &msg, 0) != hlen + len) { /* * if (mclcb->verbose == 2) * malloc_stats(); */ perror("mcl_send_pkt: sendmsg"); PRINT_ERR((mcl_stderr, "mcl_send_pkt: ERROR, sendmsg failed; priv_sock=%d, family=%d, dst=%s/%d, hlen=%d, len=%d\n", mg->priv_sock, ((struct sockaddr_in*)(msg.msg_name))->sin_family, inet_ntoa(((struct sockaddr_in*)(msg.msg_name))->sin_addr), ntohs(((struct sockaddr_in*)(msg.msg_name))->sin_port), hlen, len)) mcl_exit(1); } #endif /* OS_DEP */ if (mclcb->verbose >= 4) { #ifdef WIN32 PRINT_OUT((mcl_stdout, "sendmsg: priv_sock=%d, family=%d, dst=%s/%d, hlen=%d, len=%d\n", (int)mg->priv_sock, ((struct sockaddr_in*)msg_name)->sin_family, inet_ntoa(((struct sockaddr_in*)msg_name)->sin_addr), ntohs(((struct sockaddr_in*)msg_name)->sin_port), hlen, len)) #else /* UNIX */ PRINT_OUT((mcl_stdout, "sendmsg: priv_sock=%d, family=%d, dst=%s/%d, hlen=%d, len=%d\n", mg->priv_sock, ((struct sockaddr_in*)(msg.msg_name))->sin_family, inet_ntoa(((struct sockaddr_in*)(msg.msg_name))->sin_addr), ntohs(((struct sockaddr_in*)(msg.msg_name))->sin_port), hlen, len)) #endif /* OS_DEP */ } #ifdef SIMUL_LOSSES } else { TRACELVL(3, (mcl_stdout, "=> Random Loss on layer %d\n", layer)) } #endif /* SIMUL_LOSSES */ /*mcl_usleep(10);*/ /* 0.01 ms to be sure will be received*/ len = 0; /* no data to send next time */ #ifdef NEVERDEF do { TO DO..... } while (CanCopyMoreSig(layer)); #endif TRACELVL(5, (mcl_stdout, "<- mcl_send_pkt:\n")) return 0; bad: TRACELVL(5, (mcl_stdout, "<- mcl_send_pkt: ERROR\n")) return -1; #undef PBUF_LEN } /* FIXME : more work to do with this function - do it later */ /* -> select on all fds of all ids */ /* -> update all mclcb (of all ids) */ /* * Receive a new packet * Returns the packet len if ok, -1 if error */ void mcl_recv_pkt (mclcb_t *mclcb) { //#define PBUF_LEN (mclcb->payload_size + MAX_ALC_HEADER_SIZE) #define PBUF_LEN (MAX_DATAGRAM_SIZE) //if sender has different tx profile.. mcl_rx_pkt *pkt; // recv'd packet buffer struct sockaddr saddr; /* buffer for src addr or NULL */ int saddr_len; /* src addr len */ class mcl_addr addr; mcgroup_t *mg; int n, i; fd_set tmp_fds; struct timeval tv; /* don't wait indefinitely in select */ TRACELVL(5, (mcl_stdout, "-> mcl_recv_pkt:\n")) again: /* * wait to receive packets... a select on several fd is required! */ if (mclcb->rxlvl.n_fd == 0) { /* nothing to select, return! */ return; } tmp_fds = mclcb->rxlvl.fds; tv.tv_sec = 1; /* wait at most 1s to have opportunity to update fds */ tv.tv_usec = 0; mcl_unlock(&mcl_mutex_lock); #ifdef WIN32 if (mclcb->test_cancel) { ExitThread(0); } #else pthread_testcancel(); #endif if ((n = select(mclcb->rxlvl.nfds, &tmp_fds, NULL, NULL, &tv)) < 0) { #if 0 #ifdef SOLARIS /* cannot check errno reliably on Solaris! don't know why!!! */ mcl_lock(&mcl_mutex_lock); goto again; /* always try again in practice */ #elif defined(WIN32) int err = WSAGetLastError(); /* save Error Value value first */ mcl_lock(&mcl_mutex_lock); /* in case we check again */ if (err == WSAEINTR || /* interrupted */ err == WSAENOTSOCK) { /*fdset changed, eg after layer drop*/ goto again; } perror("mcl_recv_pkt: select"); mcl_exit(1); #else /* LINUX */ int err = errno; /* save errno value first */ mcl_lock(&mcl_mutex_lock); /* in case we check again */ if (err == EINTR || /* interrupted */ err == EBADF) { /* fdset changed (eg after layer drop) */ goto again; } perror("mcl_recv_pkt: select"); mcl_exit(1); #endif /* OSDEP */ #endif /* 0 */ mcl_lock(&mcl_mutex_lock); goto again; /* always try again in practice */ } #ifdef WIN32 if (mclcb->test_cancel) { ExitThread(0); } #else pthread_testcancel(); #endif mcl_lock(&mcl_mutex_lock); if (n == 0) { /* nothing received */ TRACELVL(5, (mcl_stdout, "<- mcl_recv_pkt: timeout\n")) return; } /* * read the packet(s) received (there may be more than one!) */ /* if (n>0) printf("select returned n=%d\n", n);*/ for (i = 0, mg = mclcb->mcgroup_tab ; i < mclcb->mcl_max_group ; i++, mg++) { if (mcl_is_valid_sock(mg->ses_sock) && FD_ISSET((int)mg->ses_sock, &tmp_fds)) { while (mcl_is_valid_sock(mg->ses_sock)) { pkt = new mcl_rx_pkt(PBUF_LEN); ASSERT(pkt); saddr_len = sizeof(saddr); memset(&saddr, 0, sizeof(saddr)); pkt->pkt_len = recvfrom(mg->ses_sock, pkt->get_buf(), pkt->get_buf_len(), 0, &saddr, #ifdef LINUX (size_t*) #endif &saddr_len); #ifdef WIN32 if(pkt->pkt_len == SOCKET_ERROR || pkt->pkt_len == 0) { /* we are in non-blocking mode! */ if (WSAGetLastError() == WSAEWOULDBLOCK || WSAGetLastError() == WSAENOTSOCK) { /* no ready packet anymore */ delete pkt; break; } /* else exit... */ TRACELVL(5, (mcl_stdout, "mcl_recv_pkt: ERROR, ses_sock=%d, error=%d\n", mg->ses_sock, WSAGetLastError())) PRINT_ERR((mcl_stderr, "mcl_recv_pkt: ERROR, ses_sock=%d, error=%d\n", mg->ses_sock, WSAGetLastError())) mcl_exit(1); } #elif defined(SOLARIS) if (pkt->pkt_len < 0) { /* we are in non-blocking mode! */ /* I cannot check errno reliably on * Solaris but I don't know why! */ /* XXX: assume no ready packet anymore*/ delete pkt; break; } #else /* LINUX */ if (pkt->pkt_len < 0) { /* we are in non-blocking mode! */ if (errno == EAGAIN || errno == EWOULDBLOCK) { /* no ready packet anymore */ delete pkt; break; } /* else exit... */ TRACELVL(5, (mcl_stdout, "mcl_recv_pkt: ERROR, ses_sock=%d, error=%d\n", mg->ses_sock, errno)) perror("mcl_recv_pkt: recvfrom"); mcl_exit(1); } #endif /* OSDEP */ addr.set_addr_struct((struct sockaddr_in*)&saddr); /* and finally process the packet */ mcl_process_pkt (mclcb, pkt, &addr); /* * try again only on layers >= 2 where we know * we can receive more than 2 pkts per cycle */ if (i <= 1) break; } } /* Julien's HACK 01/10/02 (maybe wrong) */ mcl_unlock(&mcl_mutex_lock); #ifdef WIN32 if(mclcb->test_cancel) { ExitThread(0); } #else pthread_testcancel(); #endif mcl_lock(&mcl_mutex_lock); /* EndOfHack */ if (mcl_is_valid_sock(mg->priv_sock) && FD_ISSET(mg->priv_sock, &tmp_fds)) { pkt = new mcl_rx_pkt(PBUF_LEN); saddr_len = sizeof(saddr); memset(&saddr, 0, sizeof(saddr)); pkt->pkt_len = recvfrom(mg->priv_sock, pkt->get_buf(), pkt->get_buf_len(), 0, &saddr, #ifdef LINUX (size_t*) #endif &saddr_len); #ifdef WIN32 if(pkt->pkt_len == SOCKET_ERROR || pkt->pkt_len == 0) { PRINT_ERR((mcl_stderr, "mcl_recv_pkt: recvfrom failed (%d)\n", WSAGetLastError())) mcl_exit(1); } #else /* UNIX */ if (pkt->pkt_len < 0) { perror("mcl_recv_pkt: recvfrom"); mcl_exit(1); } #endif /* OSDEP */ addr.set_addr_struct((struct sockaddr_in*)&saddr); /* * useless since there is no feedback for the present! */ delete pkt; PRINT_ERR((mcl_stderr, "mcl_recv_pkt: WARNING, rx a packet on priv_sock... useless, ignore\n")) } } TRACELVL(5, (mcl_stdout, "<- mcl_recv_pkt: \n")) #undef PBUF_LEN } #ifdef NO_LONGER_NEEDED /* { */ /* * replace id by all the socket fds of the session in set fds * return the highest fd + 1 * used by mcl_select() only */ int mcl_add_in_our_fds (int id, fd_set *fds) { int max_n = 0; mcgroup_t *mg; int i; mclcb_t *mclcb = mclcbs[id]; ASSERT((fds)) TRACELVL(5, (mcl_stdout, "-> mcl_add_in_our_fds: id=%d, fds=x%x\n", id, (int)fds)) /* remove our session idf */ FD_CLR((u_int)id, fds); for (i = 0, mg = mclcb->mcgroup_tab; i < mclcb->mcl_max_group; i++, mg++) { /* * consider only the groups initialized and add * the associated fd (unicast or mcast socket) */ if (mcl_is_valid_sock(mg->ses_sock)) { FD_SET((u_int)mg->ses_sock, fds); max_n = max(max_n, (int)(mg->ses_sock)+1); } if (mcl_is_valid_sock(mg->priv_sock)) { FD_SET((u_int)mg->priv_sock, fds); max_n = max(max_n, (int)(mg->priv_sock)+1); } } #ifdef DEBUG if (mclcb->verbose >= 5) mcl_print_fds("after add_in_our_fds: ", max_n, fds); #endif /* DEBUG */ TRACELVL(5, (mcl_stdout, "<- mcl_add_in_our_fds: max_n=%d\n", max_n)) return max_n; } /* * replace id by all the socket fds of the session in set fds * used by mcl_select() only */ void mcl_set_in_fds (int fd, /* the fd that is set */ fd_set *fds, /* fd_set where to copy if fd is ours */ int sender) /* consider writefds or readfds ? */ { mcgroup_t *mg; int i; int found = 0; int j; mclcb_t *mclcb; ASSERT((fds)) /* TRACELVL(5, (mcl_stdout, "-> mcl_set_in_fds: fd=%d, fds=x%x, sender=%d\n", fd, (int)fds, sender)) */ /* * is fd an MCL session idf ? */ for (j = 0; j < MCLCB_MAX_ID; j++) { mclcb = mclcbs[j]; if (mclcb == NULL || !mclcb->initialized) continue; mcl_lock(&mcl_mutex_lock); for (i = 0, mg = mclcb->mcgroup_tab; i < mclcb->mcl_max_group; i++, mg++) { if (fd && ((int)mg->ses_sock == fd || (int)mg->priv_sock == fd)) { /* yes so set the associated id in the fds */ /* XXX: assume only one session !!! <- NO MORE*/ found++; /* XXX: assume not rx and tx at same time */ if (sender) FD_SET((u_int)mclcb->id, fds); else FD_SET((u_int)mclcb->id, fds); break; } } mcl_unlock(&mcl_mutex_lock); } if (!found) FD_SET((u_int)fd,fds); #ifdef DEBUG #ifdef NEVERDEF if (mclcb->verbose >= 5) { mcl_print_fds("after set_in_fds: ", max_n, fds); } #endif /* NEVERDEF */ #endif /* DEBUG */ /* TRACELVL(5, (mcl_stdout, "<- mcl_set_in_fds: %s\n", found ? "found" : "not ours"))*/ } /* * for debug essentially */ void mcl_print_fds (char *msg, int nfds, fd_set *fds) { int i; /* ASSERT((fds && nfds)) */ #ifdef NEVERDEF if (mclcb->verbose <= 4) TRACELVL(5, (mcl_stdout, "-> mcl_print_fds: nfds=%d, fds=x%x\n", nfds, (int)fds)) return; #endif PRINT_OUT((mcl_stdout, "%s fd_set: ", msg)) if (fds) { for (i = 0; i < nfds; i++) { if (FD_ISSET(i, fds)) PRINT_OUT((mcl_stdout, "%d ", i)) } } else { PRINT_OUT((mcl_stdout,"-")) } PRINT_OUT((mcl_stdout, " - n=%d\n",nfds)) #ifdef NEVERDEF TRACELVL(5, (mcl_stdout, "<- mcl_print_fds:\n")) #endif } #endif /* } NO_LONGER_NEEDED */ /* * Set or read the Recv/Send socket buffer size. * Returns the value set or read. */ int mcl_sock_buf_mgmt ( #ifdef WIN32 SOCKET fd, #else int fd, #endif int buf, /* which buffer ? SO_RCVBUF or SO_SNDBUF */ int op, /* set or read ? SET_SOCKBUF or GET_SOCKBUF */ int val) { int err = 0; int len = sizeof(val); if (op == SET_SOCKBUF) err = setsockopt(fd, SOL_SOCKET, buf, (char*)&val, sizeof(val)); else err = getsockopt(fd, SOL_SOCKET, buf, (char*)&val, #ifdef LINUX (size_t*) /* linux => uint, solaris => int */ #endif &len); if (err < 0) { perror("mcl_sock_buf_mgmt"); mcl_exit(1); } return val; } /* * Create all the required sockets and initialize them. * Used both for sender and receiver, and unicast or multicast. * Return 0 if ok, < 0 if error. */ int mcl_init_sockets (mclcb_t *mclcb) { int layer; mcgroup_t *mg; //struct sockaddr_in saddr; mcl_addr addr; TRACELVL(5, (mcl_stdout, "-> mcl_init_sockets:\n")) for (layer = 0, mg = mclcb->mcgroup_tab; layer < mclcb->mcl_max_group; layer++, mg++) { mg->group = layer; if (mclcb->addr.is_multicast_addr()) { /* * in case we are a receiver, subscribe only to the * groups of interest and skip the others... */ if ((mclcb->receiver > mclcb->sender) && layer >= mclcb->nb_level) { mg->can_rx = 0; continue; } /* * use a block of mcast addr... */ #if 0 saddr.sin_addr.s_addr = htonl(mclcb->addr + layer); /* changing port is required by Solaris, not by Linux!*/ saddr.sin_port = htons((u_short)(mclcb->myport + layer)); #endif addr = mclcb->addr; addr.set_addr(addr.get_addr() + layer); /* changing the port number is in fact only required * by Solaris, not by Linux!*/ addr.set_port(addr.get_port() + layer); } else { /* this is unicast, so a single socket is sufficient */ if (layer >= 1) { mg->can_rx = 0; continue; } addr = mclcb->addr; } /* * now init everything */ mcl_layer_sock_init(mclcb, layer, mg, &addr); } TRACELVL(5, (mcl_stdout, "<- mcl_init_sockets:\n")) return 0; } /* * do the socket init work now */ int mcl_layer_sock_init (mclcb_t *mclcb, int layer, mcgroup_t *mg, mcl_addr *addr) #if 0 struct sockaddr_in *saddr #endif { /* solaris seems to care about the int/char type, not linux! */ #ifdef WIN32 u_int32_t mcast_ttl; /* for setsockopt */ // u_int8_t mcast_loop; u_long set_non_blocking = 1; /* non blocking read */ #else u_int8_t mcast_ttl; /* for setsockopt */ u_int8_t mcast_loop; #endif /* OS_DEP */ u_int32_t reuse_addr; /* for setsockopt */ struct ip_mreq imr; /* ... to join mcast group */ struct in_addr if_addr; /* interface IP address */ struct sockaddr_in tmp_addr; /* temporary */ int tmp_len; /* temporary */ int mode = mclcb->mode; /* sender and/or receiver? */ TRACELVL(5, (mcl_stdout, "-> mcl_layer_sock_init: mode=x%x, layer=%d\n", mode, layer)) memset((char*)mg, 0, sizeof(*mg)); mg->group = layer; /*memcpy(&(mg->addr), saddr, sizeof(*saddr));*/ mg->addr = *addr; /*if_addr.s_addr = htonl(mclcb->mcast_if);*/ if_addr.s_addr = htonl(mclcb->mcast_if.get_addr()); if ((mode & MODE_UNI_TX) || (mode & MODE_MCAST_TX) || (mode & MODE_SIG_UNI_TX) || (mode & MODE_SIG_UNI_RX)) { mg->can_tx = 1; /* private socket is always used for tx */ #ifdef WIN32 if ((mg->priv_sock = socket(AF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) #else if ((mg->priv_sock = socket(AF_INET, SOCK_DGRAM, 0)) <= 0) #endif { perror("mcl_layer_sock_init (sender): socket"); mcl_exit(1); } if (mode & MODE_SIG_UNI_RX) { /* priv_sock also used for rx of signalization */ mg->can_rx = 1; /* for the future select() */ FD_SET((u_int)mg->priv_sock, &(mclcb->rxlvl.fds)); mclcb->rxlvl.nfds = max(mclcb->rxlvl.nfds, (int)mg->priv_sock + 1); mclcb->rxlvl.n_fd++; } /* do not bind on this socket to get a locally unique port # */ } if (mode & MODE_MCAST_TX) { #ifdef WIN32 SOCKADDR_IN source_sin; source_sin.sin_family = AF_INET; source_sin.sin_port = htons(0); source_sin.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(mg->priv_sock, (struct sockaddr FAR *)&source_sin, sizeof (source_sin)) == SOCKET_ERROR) { perror("mcl_layer_sock_init (sender): bind() error"); mcl_exit(1); } #endif /* WIN32 */ /* specify multicast interface */ if (setsockopt(mg->priv_sock, IPPROTO_IP, IP_MULTICAST_IF, (char*)&if_addr.s_addr, sizeof(if_addr.s_addr)) < 0 ) { perror("mcl_layer_sock_init (sender): IP_MULTICAST_IF"); mcl_exit(1); } /* specify ttl */ mcast_ttl = min(mclcb->ttl, 255); if (setsockopt(mg->priv_sock, IPPROTO_IP, IP_MULTICAST_TTL, (char *)&mcast_ttl, sizeof(mcast_ttl)) < 0 ) { perror("mcl_layer_sock_init (sender): IP_MULTICAST_TTL"); mcl_exit(1); } #ifndef WIN32 /* turn on the loop back of multicast packets */ mcast_loop = 1; if (setsockopt(mg->priv_sock, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&mcast_loop, sizeof(mcast_loop)) != 0) { perror("mcl_layer_sock_init (sender): IP_MULTICAST_LOOP"); mcl_exit(1); } #endif /* NOT WIN32 */ } if ((mode & MODE_UNI_RX) || (mode & MODE_MCAST_RX)) { mg->can_rx = 1; /* session socket is always used for mcast rx */ #ifdef WIN32 if ((mg->ses_sock = socket(AF_INET, SOCK_DGRAM, 0)) == INVALID_SOCKET) #else if ((mg->ses_sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) #endif { perror("mcl_layer_sock_init (recv): socket"); mcl_exit(1); } /* * set non-blocking mode for future read on the session sock. * we noticed that it largely accelerates packet reception * (see mcl_network.cpp:mcl_recv_pkt()). * Using the default blocking mode requires to always call * select() before trying to read packets. In non-blocking * mode we can read() at any time. */ #ifdef WIN32 if (ioctlsocket(mg->ses_sock, FIONBIO, (&set_non_blocking)) == SOCKET_ERROR ) { perror("mcl_layer_sock_init (recv): fcntl"); mcl_exit(1); } #else if (fcntl(mg->ses_sock, F_SETFL, O_NONBLOCK) < 0) { perror("mcl_layer_sock_init (recv): fcntl"); mcl_exit(1); } #endif /* increase the default socket size */ if (mclcb->rx_sock_size > 0) { mcl_sock_buf_mgmt(mg->ses_sock, SO_SNDBUF, SET_SOCKBUF, mclcb->rx_sock_size); } /* for the future select() */ FD_SET((u_int)mg->ses_sock, &(mclcb->rxlvl.fds)); mclcb->rxlvl.nfds = max(mclcb->rxlvl.nfds, (int)mg->ses_sock + 1); mclcb->rxlvl.n_fd++; /* allow for reuse of the couple addr/port */ reuse_addr = 1; if (mclcb->addr.is_multicast_addr() && setsockopt(mg->ses_sock, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse_addr, sizeof(reuse_addr)) < 0) { perror("mcl_layer_sock_init (recv): REUSEADDR"); mcl_exit(1); } /* now bind the session port number (with INADDR_ANY addr) */ #if 0 memcpy(&tmp_addr, saddr, sizeof(*saddr)); #endif addr->get_addr_struct(&tmp_addr); tmp_addr.sin_addr.s_addr = htonl(INADDR_ANY); if ((int)bind(mg->ses_sock, (struct sockaddr*)&tmp_addr, sizeof(tmp_addr)) < 0) { perror("mcl_layer_sock_init (recv): bind"); PRINT_ERR((mcl_stderr, "mcl_layer_sock_init (recv): ERROR; bind to %s/%d failed\n", inet_ntoa(tmp_addr.sin_addr), ntohs(tmp_addr.sin_port))) mcl_exit(1); } } if (mode & MODE_MCAST_RX) { /* specify multicast interface */ if (setsockopt(mg->ses_sock, IPPROTO_IP, IP_MULTICAST_IF, (char*)&if_addr.s_addr, sizeof(if_addr.s_addr)) < 0 ) { perror("mcl_layer_sock_init (recv): IP_MULTICAST_IF"); mcl_exit(1); } #ifndef WIN32 /* loop back multicast packets */ /* XXX: is it really required ? */ mcast_loop = 1; if (setsockopt(mg->ses_sock, IPPROTO_IP, IP_MULTICAST_LOOP, (char *)&mcast_loop, sizeof(mcast_loop)) != 0) { perror("mcl_layer_sock_init (recv): IP_MULTICAST_LOOP"); mcl_exit(1); } #endif /* NOT WIN32 */ /* multicast join */ /*imr.imr_multiaddr.s_addr = saddr->sin_addr.s_addr;*/ imr.imr_multiaddr.s_addr = htonl(addr->get_addr()); imr.imr_interface.s_addr = if_addr.s_addr; if (setsockopt(mg->ses_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *)&imr, sizeof(imr)) < 0) { perror("mcl_layer_sock_init (recv): IP_ADD_MEMBERSHIP"); mcl_exit(1); } } if (mclcb->verbose >= 4) { /* retrieve local port nb assigned to us */ tmp_len = sizeof(tmp_addr); if (mcl_is_valid_sock(mg->priv_sock) && getsockname(mg->priv_sock, (struct sockaddr *)&tmp_addr, #ifdef LINUX (socklen_t*) #endif &tmp_len) < 0) { #ifndef WIN32 perror("mcl_layer_sock_init: getsockname"); #endif PRINT_ERR((mcl_stderr, "mcl_layer_sock_init: ERROR; getsockname() failed\n")) mcl_exit(1); } PRINT_OUT((mcl_stdout, "mcl_layer_sock_init: layer %d <=> ses_sock=%d: %s/%d priv_sock=%d: 0/%d\n", layer, mg->ses_sock, mg->addr.get_addr_string(), ntohs(mg->addr.get_port()), (int)mg->priv_sock, ntohs(tmp_addr.sin_port))) } TRACELVL(5, (mcl_stdout, "<- mcl_layer_sock_init:\n")) return 0; } /* * Close all the sockets. * Used both for sender and receiver, and unicast or multicast. * Return 0 if ok, < 0 if error. */ int mcl_close_sockets (mclcb_t *mclcb) { int i; mcgroup_t *mg; TRACELVL(5, (mcl_stdout, "-> mcl_close_sockets:\n")) if (mclcb->receiver > mclcb->sender) { /* * we are above all a receiver. Use the dedicated function... */ mcl_drop_layer(mclcb, MCL_ALL_LAYERS, MCL_DO_IT); } else { /* * we are above all a sender. Go through all mg entries... */ for (i = 0, mg = mclcb->mcgroup_tab; i < mclcb->mcl_max_group; i++, mg++) { if (!mclcb->addr.is_multicast_addr() && i > 0) { /* * there are several mcgroup entries but send * only on one socket ! */ continue; } /* now close it */ mcl_drop_this_layer(mclcb, i); } } TRACELVL(5, (mcl_stdout, "<- mcl_close_sockets:\n")) return 0; }