/*****************************************************************************/
/* "NetPIPE" -- Network Protocol Independent Performance Evaluator. */
/* Copyright 1997, 1998 Iowa State University Research Foundation, Inc. */
/* */
/* This program is free software; you can redistribute it and/or modify */
/* it under the terms of the GNU General Public License as published by */
/* the Free Software Foundation. You should have received a copy of the */
/* GNU General Public License along with this program; if not, write to the */
/* Free Software Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
/* */
/* ib.c ---- Infiniband module for the Mellanox VAPI */
/*****************************************************************************/
#define USE_VOLATILE_RPTR /* needed for polling on last byte of recv buffer */
#include "netpipe.h"
#include <stdio.h>
#include <getopt.h>
/* Debugging output macro */
FILE* logfile;
#if 0
#define LOGPRINTF(_format, _aa...) fprintf(logfile, __FUNCTION__": " _format, ##_aa); fflush(logfile)
#else
#define LOGPRINTF(_format, _aa...)
#endif
/* Header files needed for Infiniband */
#include "vapi.h" /* Mellanox Verbs API */
#include "evapi.h" /* Mellanox Verbs API extension */
#include "vapi_common.h" /* Mellanox VIP layer of HCA Verbs */
/* Global vars */
static VAPI_hca_hndl_t hca_hndl=VAPI_INVAL_HNDL;
static VAPI_hca_port_t hca_port;
static int port_num;
static IB_lid_t lid;
static IB_lid_t d_lid;
static VAPI_pd_hndl_t pd_hndl=VAPI_INVAL_HNDL;
static VAPI_cqe_num_t num_cqe;
static VAPI_cqe_num_t act_num_cqe;
static VAPI_cq_hndl_t s_cq_hndl=VAPI_INVAL_HNDL;
static VAPI_cq_hndl_t r_cq_hndl=VAPI_INVAL_HNDL;
static EVAPI_compl_handler_hndl_t ceh_hndl=VAPI_INVAL_HNDL;
static VAPI_mrw_t mr_in;
static VAPI_mrw_t s_mr_out;
static VAPI_mrw_t r_mr_out;
static VAPI_mr_hndl_t s_mr_hndl=VAPI_INVAL_HNDL;
static VAPI_mr_hndl_t r_mr_hndl=VAPI_INVAL_HNDL;
static VAPI_qp_init_attr_t qp_init_attr;
static VAPI_qp_prop_t qp_prop;
static VAPI_qp_hndl_t qp_hndl=VAPI_INVAL_HNDL;
static VAPI_qp_num_t d_qp_num;
static VAPI_qp_attr_mask_t qp_attr_mask;
static VAPI_qp_attr_t qp_attr;
static VAPI_qp_cap_t qp_cap;
static VAPI_wc_desc_t wc;
static int max_wq=50000;
static void* remote_address;
static VAPI_rkey_t remote_key;
static volatile int receive_complete;
/* Local prototypes */
void event_handler(VAPI_hca_hndl_t, VAPI_cq_hndl_t, void*);
/* Function definitions */
void Init(ArgStruct *p, int* pargc, char*** pargv)
{
/* Set defaults
*/
p->prot.ib_mtu = MTU1024; /* 1024 Byte MTU */
p->prot.commtype = NP_COMM_SENDRECV; /* Use Send/Receive communications */
p->prot.comptype = NP_COMP_LOCALPOLL; /* Use local polling for completion */
p->tr = 0; /* I am not the transmitter */
p->rcv = 1; /* I am the receiver */
}
void Setup(ArgStruct *p)
{
int one = 1;
int sockfd;
struct sockaddr_in *lsin1, *lsin2; /* ptr to sockaddr_in in ArgStruct */
char *host;
struct hostent *addr;
struct protoent *proto;
int send_size, recv_size, sizeofint = sizeof(int);
struct sigaction sigact1;
char logfilename[80];
/* Sanity check */
if( p->prot.commtype == NP_COMM_RDMAWRITE &&
p->prot.comptype != NP_COMP_LOCALPOLL ) {
fprintf(stderr, "Error, RDMA Write may only be used with local polling.\n");
fprintf(stderr, "Try using RDMA Write With Immediate Data with vapi polling\n");
fprintf(stderr, "or event completion\n");
exit(-1);
}
/* Open log file */
sprintf(logfilename, ".iblog%d", 1 - p->tr);
logfile = fopen(logfilename, "w");
host = p->host; /* copy ptr to hostname */
lsin1 = &(p->prot.sin1);
lsin2 = &(p->prot.sin2);
bzero((char *) lsin1, sizeof(*lsin1));
bzero((char *) lsin2, sizeof(*lsin2));
if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
printf("NetPIPE: can't open stream socket! errno=%d\n", errno);
exit(-4);
}
if(!(proto = getprotobyname("tcp"))){
printf("NetPIPE: protocol 'tcp' unknown!\n");
exit(555);
}
if (p->tr){ /* if client i.e., Sender */
if (atoi(host) > 0) { /* Numerical IP address */
lsin1->sin_family = AF_INET;
lsin1->sin_addr.s_addr = inet_addr(host);
} else {
if ((addr = gethostbyname(host)) == NULL){
printf("NetPIPE: invalid hostname '%s'\n", host);
exit(-5);
}
lsin1->sin_family = addr->h_addrtype;
bcopy(addr->h_addr, (char*) &(lsin1->sin_addr.s_addr), addr->h_length);
}
lsin1->sin_port = htons(p->port);
} else { /* we are the receiver (server) */
bzero((char *) lsin1, sizeof(*lsin1));
lsin1->sin_family = AF_INET;
lsin1->sin_addr.s_addr = htonl(INADDR_ANY);
lsin1->sin_port = htons(p->port);
if (bind(sockfd, (struct sockaddr *) lsin1, sizeof(*lsin1)) < 0){
printf("NetPIPE: server: bind on local address failed! errno=%d", errno);
exit(-6);
}
}
if(p->tr)
p->commfd = sockfd;
else
p->servicefd = sockfd;
/* Establish tcp connections */
establish(p);
/* Initialize Mellanox Infiniband */
if(initIB(p) == -1) {
CleanUp(p);
exit(-1);
}
}
int initIB(ArgStruct *p)
{
VAPI_ret_t ret;
/* Open HCA */
/* open hca just in case it was not opened by system earlier */
ret = VAPI_open_hca("InfiniHost0", &hca_hndl);
ret = EVAPI_get_hca_hndl("InfiniHost0", &hca_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error opening Infiniband HCA: %s\n", VAPI_strerror(ret));
return -1;
} else {
LOGPRINTF("Opened Infiniband HCA\n");
}
/* Get HCA properties */
port_num=1;
ret = VAPI_query_hca_port_prop(hca_hndl, (IB_port_t)port_num,
(VAPI_hca_port_t *)&hca_port);
if(ret != VAPI_OK) {
fprintf(stderr, "Error querying Infiniband HCA: %s\n", VAPI_strerror(ret));
return -1;
} else {
LOGPRINTF("Queried Infiniband HCA\n");
}
lid = hca_port.lid;
LOGPRINTF(" lid = %d\n", lid);
/* Allocate Protection Domain */
ret = VAPI_alloc_pd(hca_hndl, &pd_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error allocating PD: %s\n", VAPI_strerror(ret));
return -1;
} else {
LOGPRINTF("Allocated Protection Domain\n");
}
/* Create send completion queue */
num_cqe = 30000; /* Requested number of completion q elements */
ret = VAPI_create_cq(hca_hndl, num_cqe, &s_cq_hndl, &act_num_cqe);
if(ret != VAPI_OK) {
fprintf(stderr, "Error creating send CQ: %s\n", VAPI_strerror(ret));
return -1;
} else {
LOGPRINTF("Created Send Completion Queue with %d elements\n", act_num_cqe);
}
/* Create recv completion queue */
num_cqe = 20000; /* Requested number of completion q elements */
ret = VAPI_create_cq(hca_hndl, num_cqe, &r_cq_hndl, &act_num_cqe);
if(ret != VAPI_OK) {
fprintf(stderr, "Error creating recv CQ: %s\n", VAPI_strerror(ret));
return -1;
} else {
LOGPRINTF("Created Recv Completion Queue with %d elements\n", act_num_cqe);
}
/* Placeholder for MR */
/* Create Queue Pair */
qp_init_attr.cap.max_oust_wr_rq = max_wq; /* Max outstanding WR on RQ */
qp_init_attr.cap.max_oust_wr_sq = max_wq; /* Max outstanding WR on SQ */
qp_init_attr.cap.max_sg_size_rq = 1; /* Max scatter/gather entries on RQ */
qp_init_attr.cap.max_sg_size_sq = 1; /* Max scatter/gather entries on SQ */
qp_init_attr.pd_hndl = pd_hndl; /* Protection domain handle */
qp_init_attr.rdd_hndl = 0; /* Reliable datagram domain handle */
qp_init_attr.rq_cq_hndl = r_cq_hndl; /* CQ handle for RQ */
qp_init_attr.rq_sig_type = VAPI_SIGNAL_REQ_WR; /* Signalling type */
qp_init_attr.sq_cq_hndl = s_cq_hndl; /* CQ handle for RQ */
qp_init_attr.sq_sig_type = VAPI_SIGNAL_REQ_WR; /* Signalling type */
qp_init_attr.ts_type = IB_TS_RC; /* Transmission type */
ret = VAPI_create_qp(hca_hndl, &qp_init_attr, &qp_hndl, &qp_prop);
if(ret != VAPI_OK) {
fprintf(stderr, "Error creating Queue Pair: %s\n", VAPI_strerror(ret));
return -1;
} else {
LOGPRINTF("Created Queue Pair, max outstanding WR on RQ: %d, on SQ: %d\n",
qp_prop.cap.max_oust_wr_rq, qp_prop.cap.max_oust_wr_sq);
}
/* Exchange lid and qp_num with other node */
if( write(p->commfd, &lid, sizeof(lid) ) != sizeof(lid) ) {
fprintf(stderr, "Failed to send lid over socket\n");
return -1;
}
if( write(p->commfd, &qp_prop.qp_num, sizeof(qp_prop.qp_num) ) != sizeof(qp_prop.qp_num) ) {
fprintf(stderr, "Failed to send qpnum over socket\n");
return -1;
}
if( read(p->commfd, &d_lid, sizeof(d_lid) ) != sizeof(d_lid) ) {
fprintf(stderr, "Failed to read lid from socket\n");
return -1;
}
if( read(p->commfd, &d_qp_num, sizeof(d_qp_num) ) != sizeof(d_qp_num) ) {
fprintf(stderr, "Failed to read qpnum from socket\n");
return -1;
}
LOGPRINTF("Local: lid=%d qp_num=%d Remote: lid=%d qp_num=%d\n",
lid, qp_prop.qp_num, d_lid, d_qp_num);
/* Bring up Queue Pair */
/******* INIT state ******/
QP_ATTR_MASK_CLR_ALL(qp_attr_mask);
qp_attr.qp_state = VAPI_INIT;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE);
qp_attr.pkey_ix = 0;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PKEY_IX);
qp_attr.port = port_num;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PORT);
qp_attr.remote_atomic_flags = VAPI_EN_REM_WRITE | VAPI_EN_REM_READ;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_REMOTE_ATOMIC_FLAGS);
ret = VAPI_modify_qp(hca_hndl, qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap);
if(ret != VAPI_OK) {
fprintf(stderr, "Error modifying QP to INIT: %s\n", VAPI_strerror(ret));
return -1;
}
LOGPRINTF("Modified QP to INIT\n");
/******* RTR (Ready-To-Receive) state *******/
QP_ATTR_MASK_CLR_ALL(qp_attr_mask);
qp_attr.qp_state = VAPI_RTR;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE);
qp_attr.qp_ous_rd_atom = 1;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_OUS_RD_ATOM);
qp_attr.dest_qp_num = d_qp_num;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_DEST_QP_NUM);
qp_attr.av.sl = 0;
qp_attr.av.grh_flag = FALSE;
qp_attr.av.dlid = d_lid;
qp_attr.av.static_rate = 0;
qp_attr.av.src_path_bits = 0;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_AV);
qp_attr.path_mtu = p->prot.ib_mtu;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PATH_MTU);
qp_attr.rq_psn = 0;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RQ_PSN);
qp_attr.pkey_ix = 0;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_PKEY_IX);
qp_attr.min_rnr_timer = 5;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_MIN_RNR_TIMER);
ret = VAPI_modify_qp(hca_hndl, qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap);
if(ret != VAPI_OK) {
fprintf(stderr, "Error modifying QP to RTR: %s\n", VAPI_strerror(ret));
return -1;
}
LOGPRINTF("Modified QP to RTR\n");
/* Sync before going to RTS state */
Sync(p);
/******* RTS (Ready-to-Send) state *******/
QP_ATTR_MASK_CLR_ALL(qp_attr_mask);
qp_attr.qp_state = VAPI_RTS;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_QP_STATE);
qp_attr.sq_psn = 0;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_SQ_PSN);
qp_attr.timeout = 31;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_TIMEOUT);
qp_attr.retry_count = 1;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RETRY_COUNT);
qp_attr.rnr_retry = 1;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_RNR_RETRY);
qp_attr.ous_dst_rd_atom = 1;
QP_ATTR_MASK_SET(qp_attr_mask, QP_ATTR_OUS_DST_RD_ATOM);
ret = VAPI_modify_qp(hca_hndl, qp_hndl, &qp_attr, &qp_attr_mask, &qp_cap);
if(ret != VAPI_OK) {
fprintf(stderr, "Error modifying QP to RTS: %s\n", VAPI_strerror(ret));
return -1;
}
LOGPRINTF("Modified QP to RTS\n");
/* If using event completion, register event completion handler and request
* the initial notification
*/
if( p->prot.comptype == NP_COMP_EVENT ) {
EVAPI_set_comp_eventh(hca_hndl, r_cq_hndl, event_handler, p, &ceh_hndl);
VAPI_req_comp_notif(hca_hndl, r_cq_hndl, VAPI_NEXT_COMP);
}
return 0;
}
int finalizeIB(ArgStruct *p)
{
VAPI_ret_t ret;
LOGPRINTF("Finalizing IB stuff\n");
/* Clear completion event handler */
if(p->prot.comptype == NP_COMP_EVENT ) {
LOGPRINTF("Clearing comp handler\n");
ret = EVAPI_clear_comp_eventh(hca_hndl, ceh_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error clearing event handler: %s\n",
VAPI_strerror(ret));
}
}
if(qp_hndl != VAPI_INVAL_HNDL) {
LOGPRINTF("Destroying QP\n");
ret = VAPI_destroy_qp(hca_hndl, qp_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error destroying Queue Pair: %s\n", VAPI_strerror(ret));
}
}
if(r_cq_hndl != VAPI_INVAL_HNDL) {
LOGPRINTF("Destroying Recv CQ\n");
ret = VAPI_destroy_cq(hca_hndl, r_cq_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error destroying recv CQ: %s\n", VAPI_strerror(ret));
}
}
if(s_cq_hndl != VAPI_INVAL_HNDL) {
LOGPRINTF("Destroying Send CQ\n");
ret = VAPI_destroy_cq(hca_hndl, s_cq_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error destroying send CQ: %s\n", VAPI_strerror(ret));
}
}
/* Check memory registrations just in case user bailed out */
if(s_mr_hndl != VAPI_INVAL_HNDL) {
LOGPRINTF("Deregistering send buffer\n");
ret = VAPI_deregister_mr(hca_hndl, s_mr_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error deregistering send mr: %s\n", VAPI_strerror(ret));
}
}
if(r_mr_hndl != VAPI_INVAL_HNDL) {
LOGPRINTF("Deregistering recv buffer\n");
ret = VAPI_deregister_mr(hca_hndl, r_mr_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error deregistering recv mr: %s\n", VAPI_strerror(ret));
}
}
if(pd_hndl != VAPI_INVAL_HNDL) {
LOGPRINTF("Deallocating PD\n");
ret = VAPI_dealloc_pd(hca_hndl, pd_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error deallocating PD: %s\n", VAPI_strerror(ret));
}
}
/* Application code should not close HCA, just release handle */
if(hca_hndl != VAPI_INVAL_HNDL) {
LOGPRINTF("Releasing HCA\n");
ret = EVAPI_release_hca_hndl(hca_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error releasing HCA: %s\n", VAPI_strerror(ret));
}
}
return 0;
}
void event_handler(VAPI_hca_hndl_t hca, VAPI_cq_hndl_t cq, void* data)
{
VAPI_ret_t ret;
while(1) {
ret = VAPI_poll_cq(hca, cq, &wc);
if(ret == VAPI_CQ_EMPTY) {
LOGPRINTF("Empty completion queue, requesting next notification\n");
VAPI_req_comp_notif(hca_hndl, r_cq_hndl, VAPI_NEXT_COMP);
return;
} else if(ret != VAPI_OK) {
fprintf(stderr, "Error in event_handler, polling cq: %s\n",
VAPI_strerror(ret));
exit(-1);
} else if(wc.status != VAPI_SUCCESS) {
fprintf(stderr, "Error in event_handler, on returned work completion "
"status: %s\n", VAPI_wc_status_sym(wc.status));
exit(-1);
}
LOGPRINTF("Retrieved work completion\n");
/* For ping-pong mode at least, this check shouldn't be needed for
* normal operation, but it will help catch any bugs with multiple
* sends coming through when we're only expecting one.
*/
if(receive_complete == 1) {
while(receive_complete != 0) sched_yield();
}
receive_complete = 1;
}
}
static int
readFully(int fd, void *obuf, int len)
{
int bytesLeft = len;
char *buf = (char *) obuf;
int bytesRead = 0;
while (bytesLeft > 0 &&
(bytesRead = read(fd, (void *) buf, bytesLeft)) > 0)
{
bytesLeft -= bytesRead;
buf += bytesRead;
}
if (bytesRead <= 0)
return bytesRead;
return len;
}
void Sync(ArgStruct *p)
{
char s[] = "SyncMe";
char response[7];
if (write(p->commfd, s, strlen(s)) < 0 ||
readFully(p->commfd, response, strlen(s)) < 0)
{
perror("NetPIPE: error writing or reading synchronization string");
exit(3);
}
if (strncmp(s, response, strlen(s)))
{
fprintf(stderr, "NetPIPE: Synchronization string incorrect!\n");
exit(3);
}
}
void PrepareToReceive(ArgStruct *p)
{
VAPI_ret_t ret; /* Return code */
VAPI_rr_desc_t rr; /* Receive request */
VAPI_sg_lst_entry_t sg_entry; /* Scatter/Gather list - holds buff addr */
/* We don't need to post a receive if doing RDMA write with local polling */
if( p->prot.commtype == NP_COMM_RDMAWRITE &&
p->prot.comptype == NP_COMP_LOCALPOLL )
return;
rr.opcode = VAPI_RECEIVE;
/* We only need signaled completions if using VAPI
* completion methods.
*/
if( p->prot.comptype == NP_COMP_LOCALPOLL )
rr.comp_type = VAPI_UNSIGNALED;
else
rr.comp_type = VAPI_SIGNALED;
rr.sg_lst_len = 1;
rr.sg_lst_p = &sg_entry;
sg_entry.lkey = r_mr_out.l_key;
sg_entry.len = p->bufflen;
sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t)p->r_ptr;
ret = VAPI_post_rr(hca_hndl, qp_hndl, &rr);
if(ret != VAPI_OK) {
fprintf(stderr, "Error posting recv request: %s\n", VAPI_strerror(ret));
CleanUp(p);
exit(-1);
} else {
LOGPRINTF("Posted recv request\n");
}
/* Set receive flag to zero and request event completion
* notification for this receive so the event handler will
* be triggered when the receive completes.
*/
if( p->prot.comptype == NP_COMP_EVENT ) {
receive_complete = 0;
}
}
void SendData(ArgStruct *p)
{
VAPI_ret_t ret; /* Return code */
VAPI_sr_desc_t sr; /* Send request */
VAPI_sg_lst_entry_t sg_entry; /* Scatter/Gather list - holds buff addr */
/* Fill in send request struct */
if(p->prot.commtype == NP_COMM_SENDRECV) {
sr.opcode = VAPI_SEND;
LOGPRINTF("Doing regular send\n");
} else if(p->prot.commtype == NP_COMM_SENDRECV_WITH_IMM) {
sr.opcode = VAPI_SEND_WITH_IMM;
LOGPRINTF("Doing regular send with imm\n");
} else if(p->prot.commtype == NP_COMM_RDMAWRITE) {
sr.opcode = VAPI_RDMA_WRITE;
sr.remote_addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(remote_address + (p->s_ptr - p->s_buff));
sr.r_key = remote_key;
LOGPRINTF("Doing RDMA write (raddr=%p)\n", sr.remote_addr);
} else if(p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM) {
sr.opcode = VAPI_RDMA_WRITE_WITH_IMM;
sr.remote_addr = (VAPI_virt_addr_t)(MT_virt_addr_t)(remote_address + (p->s_ptr - p->s_buff));
sr.r_key = remote_key;
LOGPRINTF("Doing RDMA write with imm (raddr=%p)\n", sr.remote_addr);
} else {
fprintf(stderr, "Error, invalid communication type in SendData\n");
exit(-1);
}
sr.comp_type = VAPI_UNSIGNALED;
sr.set_se = FALSE; /* This needed due to a bug in Mellanox HW rel a-0 */
sr.sg_lst_len = 1;
sr.sg_lst_p = &sg_entry;
sg_entry.lkey = s_mr_out.l_key; /* Local memory region key */
sg_entry.len = p->bufflen;
sg_entry.addr = (VAPI_virt_addr_t)(MT_virt_addr_t)p->s_ptr;
ret = VAPI_post_sr(hca_hndl, qp_hndl, &sr);
if(ret != VAPI_OK) {
fprintf(stderr, "Error posting send request: %s\n", VAPI_strerror(ret));
} else {
LOGPRINTF("Posted send request\n");
}
}
void RecvData(ArgStruct *p)
{
VAPI_ret_t ret;
/* Busy wait for incoming data */
LOGPRINTF("Receiving at buffer address %p\n", p->r_ptr);
if( p->prot.comptype == NP_COMP_LOCALPOLL ) {
/* Poll for receive completion locally on the receive data */
LOGPRINTF("Waiting for last byte of data to arrive\n");
while(p->r_ptr[p->bufflen-1] != 'a' + (p->cache ? 1 - p->tr : 1) )
{
/* BUSY WAIT -- this should be fine since we
* declared r_ptr with volatile qualifier */
}
/* Reset last byte */
p->r_ptr[p->bufflen-1] = 'a' + (p->cache ? p->tr : 0);
LOGPRINTF("Received all of data\n");
} else if( p->prot.comptype == NP_COMP_VAPIPOLL ) {
/* Poll for receive completion using VAPI poll function */
LOGPRINTF("Polling completion queue for VAPI work completion\n");
ret = VAPI_CQ_EMPTY;
while(ret == VAPI_CQ_EMPTY)
ret = VAPI_poll_cq(hca_hndl, r_cq_hndl, &wc);
if(ret != VAPI_OK) {
fprintf(stderr, "Error in RecvData, polling for completion: %s\n",
VAPI_strerror(ret));
exit(-1);
}
if(wc.status != VAPI_SUCCESS) {
fprintf(stderr, "Error in status of returned completion: %s\n",
VAPI_wc_status_sym(wc.status));
exit(-1);
}
LOGPRINTF("Retrieved successful completion\n");
} else if( p->prot.comptype == NP_COMP_EVENT ) {
/* Instead of polling directly on data or VAPI completion queue,
* let the VAPI event completion handler set a flag when the receive
* completes, and poll on that instead. Could try using semaphore here
* as well to eliminate busy polling
*/
LOGPRINTF("Polling receive flag\n");
while( receive_complete == 0 )
{
/* BUSY WAIT */
}
/* If in prepost-burst mode, we won't be calling PrepareToReceive
* between ping-pongs, so we need to reset the receive_complete
* flag here.
*/
if( p->preburst ) receive_complete = 0;
LOGPRINTF("Receive completed\n");
}
}
/* Reset is used after a trial to empty the work request queues so we
have enough room for the next trial to run */
void Reset(ArgStruct *p)
{
VAPI_ret_t ret; /* Return code */
VAPI_sr_desc_t sr; /* Send request */
VAPI_rr_desc_t rr; /* Recv request */
/* If comptype is event, then we'll use event handler to detect receive,
* so initialize receive_complete flag
*/
if(p->prot.comptype == NP_COMP_EVENT) receive_complete = 0;
/* Prepost receive */
rr.opcode = VAPI_RECEIVE;
rr.comp_type = VAPI_SIGNALED;
rr.sg_lst_len = 0;
LOGPRINTF("Posting recv request in Reset\n");
ret = VAPI_post_rr(hca_hndl, qp_hndl, &rr);
if(ret != VAPI_OK) {
fprintf(stderr, " Error posting recv request: %s\n", VAPI_strerror(ret));
CleanUp(p);
exit(-1);
}
/* Make sure both nodes have preposted receives */
Sync(p);
/* Post Send */
sr.opcode = VAPI_SEND;
sr.comp_type = VAPI_SIGNALED;
sr.set_se = FALSE; /* This needed due to a bug in Mellanox HW rel a-0 */
sr.sg_lst_len = 0;
LOGPRINTF("Posting send request \n");
ret = VAPI_post_sr(hca_hndl, qp_hndl, &sr);
if(ret != VAPI_OK) {
fprintf(stderr, " Error posting send request in Reset: %s\n",
VAPI_strerror(ret));
exit(-1);
}
if(wc.status != VAPI_SUCCESS) {
fprintf(stderr, " Error in completion status: %s\n",
VAPI_wc_status_sym(wc.status));
exit(-1);
}
LOGPRINTF("Polling for completion of send request\n");
ret = VAPI_CQ_EMPTY;
while(ret == VAPI_CQ_EMPTY)
ret = VAPI_poll_cq(hca_hndl, s_cq_hndl, &wc);
if(ret != VAPI_OK) {
fprintf(stderr, "Error polling CQ for send in Reset: %s\n",
VAPI_strerror(ret));
exit(-1);
}
if(wc.status != VAPI_SUCCESS) {
fprintf(stderr, " Error in completion status: %s\n",
VAPI_wc_status_sym(wc.status));
exit(-1);
}
LOGPRINTF("Status of send completion: %s\n", VAPI_wc_status_sym(wc.status));
if(p->prot.comptype == NP_COMP_EVENT) {
/* If using event completion, the event handler will set receive_complete
* when it gets the completion event.
*/
LOGPRINTF("Waiting for receive_complete flag\n");
while(receive_complete == 0) { /* BUSY WAIT */ }
} else {
LOGPRINTF("Polling for completion of recv request\n");
ret = VAPI_CQ_EMPTY;
while(ret == VAPI_CQ_EMPTY)
ret = VAPI_poll_cq(hca_hndl, r_cq_hndl, &wc);
if(ret != VAPI_OK) {
fprintf(stderr, "Error polling CQ for recv in Reset: %s\n",
VAPI_strerror(ret));
exit(-1);
}
if(wc.status != VAPI_SUCCESS) {
fprintf(stderr, " Error in completion status: %s\n",
VAPI_wc_status_sym(wc.status));
exit(-1);
}
LOGPRINTF("Status of recv completion: %s\n", VAPI_wc_status_sym(wc.status));
}
LOGPRINTF("Done with reset\n");
}
void SendTime(ArgStruct *p, double *t)
{
uint32_t ltime, ntime;
/*
Multiply the number of seconds by 1e6 to get time in microseconds
and convert value to an unsigned 32-bit integer.
*/
ltime = (uint32_t)(*t * 1.e6);
/* Send time in network order */
ntime = htonl(ltime);
if (write(p->commfd, (char *)&ntime, sizeof(uint32_t)) < 0)
{
printf("NetPIPE: write failed in SendTime: errno=%d\n", errno);
exit(301);
}
}
void RecvTime(ArgStruct *p, double *t)
{
uint32_t ltime, ntime;
int bytesRead;
bytesRead = readFully(p->commfd, (void *)&ntime, sizeof(uint32_t));
if (bytesRead < 0)
{
printf("NetPIPE: read failed in RecvTime: errno=%d\n", errno);
exit(302);
}
else if (bytesRead != sizeof(uint32_t))
{
fprintf(stderr, "NetPIPE: partial read in RecvTime of %d bytes\n",
bytesRead);
exit(303);
}
ltime = ntohl(ntime);
/* Result is ltime (in microseconds) divided by 1.0e6 to get seconds */
*t = (double)ltime / 1.0e6;
}
void SendRepeat(ArgStruct *p, int rpt)
{
uint32_t lrpt, nrpt;
lrpt = rpt;
/* Send repeat count as a long in network order */
nrpt = htonl(lrpt);
if (write(p->commfd, (void *) &nrpt, sizeof(uint32_t)) < 0)
{
printf("NetPIPE: write failed in SendRepeat: errno=%d\n", errno);
exit(304);
}
}
void RecvRepeat(ArgStruct *p, int *rpt)
{
uint32_t lrpt, nrpt;
int bytesRead;
bytesRead = readFully(p->commfd, (void *)&nrpt, sizeof(uint32_t));
if (bytesRead < 0)
{
printf("NetPIPE: read failed in RecvRepeat: errno=%d\n", errno);
exit(305);
}
else if (bytesRead != sizeof(uint32_t))
{
fprintf(stderr, "NetPIPE: partial read in RecvRepeat of %d bytes\n",
bytesRead);
exit(306);
}
lrpt = ntohl(nrpt);
*rpt = lrpt;
}
void establish(ArgStruct *p)
{
int clen;
int one = 1;
struct protoent;
clen = sizeof(p->prot.sin2);
if(p->tr){
if(connect(p->commfd, (struct sockaddr *) &(p->prot.sin1),
sizeof(p->prot.sin1)) < 0){
printf("Client: Cannot Connect! errno=%d\n",errno);
exit(-10);
}
}
else {
/* SERVER */
listen(p->servicefd, 5);
p->commfd = accept(p->servicefd, (struct sockaddr *) &(p->prot.sin2),
&clen);
if(p->commfd < 0){
printf("Server: Accept Failed! errno=%d\n",errno);
exit(-12);
}
}
}
void CleanUp(ArgStruct *p)
{
char *quit="QUIT";
if (p->tr)
{
write(p->commfd,quit, 5);
read(p->commfd, quit, 5);
close(p->commfd);
}
else
{
read(p->commfd,quit, 5);
write(p->commfd,quit,5);
close(p->commfd);
close(p->servicefd);
}
finalizeIB(p);
}
void AfterAlignmentInit(ArgStruct *p)
{
int bytesRead;
/* Exchange buffer pointers and remote infiniband keys if doing rdma. Do
* the exchange in this function because this will happen after any
* memory alignment is done, which is important for getting the
* correct remote address.
*/
if( p->prot.commtype == NP_COMM_RDMAWRITE ||
p->prot.commtype == NP_COMM_RDMAWRITE_WITH_IMM ) {
/* Send my receive buffer address
*/
if(write(p->commfd, (void *)&p->r_buff, sizeof(void*)) < 0) {
perror("NetPIPE: write of buffer address failed in AfterAlignmentInit");
exit(-1);
}
LOGPRINTF("Sent buffer address: %p\n", p->r_buff);
/* Send my remote key for accessing
* my remote buffer via IB RDMA
*/
if(write(p->commfd, (void *)&r_mr_out.r_key, sizeof(VAPI_rkey_t)) < 0) {
perror("NetPIPE: write of remote key failed in AfterAlignmentInit");
exit(-1);
}
LOGPRINTF("Sent remote key: %d\n", r_mr_out.r_key);
/* Read the sent data
*/
bytesRead = readFully(p->commfd, (void *)&remote_address, sizeof(void*));
if (bytesRead < 0) {
perror("NetPIPE: read of buffer address failed in AfterAlignmentInit");
exit(-1);
} else if (bytesRead != sizeof(void*)) {
perror("NetPIPE: partial read of buffer address in AfterAlignmentInit");
exit(-1);
}
LOGPRINTF("Received remote address from other node: %p\n", remote_address);
bytesRead = readFully(p->commfd, (void *)&remote_key, sizeof(VAPI_rkey_t));
if (bytesRead < 0) {
perror("NetPIPE: read of remote key failed in AfterAlignmentInit");
exit(-1);
} else if (bytesRead != sizeof(VAPI_rkey_t)) {
perror("NetPIPE: partial read of remote key in AfterAlignmentInit");
exit(-1);
}
LOGPRINTF("Received remote key from other node: %d\n", remote_key);
}
}
void MyMalloc(ArgStruct *p, int bufflen, int soffset, int roffset)
{
VAPI_ret_t ret;
/* Allocate buffers */
p->r_buff = malloc(bufflen+MAX(soffset,roffset));
if(p->r_buff == NULL) {
fprintf(stderr, "Error malloc'ing buffer\n");
exit(-1);
}
if(p->cache) {
/* Infiniband spec says we can register same memory region
* more than once, so just copy buffer address. We will register
* the same buffer twice with Infiniband.
*/
p->s_buff = p->r_buff;
} else {
p->s_buff = malloc(bufflen+soffset);
if(p->s_buff == NULL) {
fprintf(stderr, "Error malloc'ing buffer\n");
exit(-1);
}
}
/* Register buffers with Infiniband */
mr_in.acl = VAPI_EN_LOCAL_WRITE | VAPI_EN_REMOTE_WRITE;
mr_in.l_key = 0;
mr_in.pd_hndl = pd_hndl;
mr_in.r_key = 0;
mr_in.size = bufflen+MAX(soffset,roffset);
mr_in.start = (VAPI_virt_addr_t)(MT_virt_addr_t)p->r_buff;
mr_in.type = VAPI_MR;
ret = VAPI_register_mr(hca_hndl, &mr_in, &r_mr_hndl, &r_mr_out);
if(ret != VAPI_OK)
{
fprintf(stderr, "Error registering recv buffer: %s\n", VAPI_strerror(ret));
exit(-1);
}
else
{
LOGPRINTF("Registered Recv Buffer\n");
}
mr_in.acl = VAPI_EN_LOCAL_WRITE;
mr_in.l_key = 0;
mr_in.pd_hndl = pd_hndl;
mr_in.r_key = 0;
mr_in.size = bufflen+soffset;
mr_in.start = (VAPI_virt_addr_t)(MT_virt_addr_t)p->s_buff;
mr_in.type = VAPI_MR;
ret = VAPI_register_mr(hca_hndl, &mr_in, &s_mr_hndl, &s_mr_out);
if(ret != VAPI_OK) {
fprintf(stderr, "Error registering send buffer: %s\n", VAPI_strerror(ret));
exit(-1);
} else {
LOGPRINTF("Registered Send Buffer\n");
}
}
void FreeBuff(char *buff1, char *buff2)
{
VAPI_ret_t ret;
if(s_mr_hndl != VAPI_INVAL_HNDL) {
LOGPRINTF("Deregistering send buffer\n");
ret = VAPI_deregister_mr(hca_hndl, s_mr_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error deregistering send mr: %s\n", VAPI_strerror(ret));
} else {
s_mr_hndl = VAPI_INVAL_HNDL;
}
}
if(r_mr_hndl != VAPI_INVAL_HNDL) {
LOGPRINTF("Deregistering recv buffer\n");
ret = VAPI_deregister_mr(hca_hndl, r_mr_hndl);
if(ret != VAPI_OK) {
fprintf(stderr, "Error deregistering recv mr: %s\n", VAPI_strerror(ret));
} else {
r_mr_hndl = VAPI_INVAL_HNDL;
}
}
if(buff1 != NULL)
free(buff1);
if(buff2 != NULL)
free(buff2);
}
syntax highlighted by Code2HTML, v. 0.9.1