in prov/gni/src/gnix_msg.c [1325:1782]
static int __gnix_rndzv_iov_req_build(void *arg)
{
int ret = FI_SUCCESS, send_idx, recv_idx, use_tx_cq_blk;
struct gnix_fab_req *req = (struct gnix_fab_req *)arg;
struct gnix_fid_ep *ep = req->gnix_ep;
struct gnix_nic *nic = ep->nic;
gni_ep_handle_t gni_ep = req->vc->gni_ep;
struct gnix_tx_descriptor *txd = NULL, *ct_txd = NULL;
size_t recv_len, send_len, get_len, send_cnt, recv_cnt, txd_cnt, ht_len;
uint64_t recv_ptr = 0UL, send_ptr = 0UL;
/* TODO: Should this be the sender's rndzv thresh instead? */
size_t rndzv_thresh = ep->domain->params.msg_rendezvous_thresh;
gni_ct_get_post_descriptor_t *cur_ct = NULL;
void **next_ct = NULL;
int head_off, head_len, tail_len;
GNIX_DBG_TRACE(FI_LOG_EP_DATA, "\n");
if (req->vc->modes & GNIX_VC_MODE_XPMEM)
return __gnix_rndzv_req_xpmem(req);
txd_cnt = 0;
send_cnt = req->msg.send_iov_cnt;
recv_ptr = req->msg.recv_info[0].recv_addr;
recv_len = req->msg.recv_info[0].recv_len;
recv_cnt = req->msg.recv_iov_cnt;
send_ptr = req->msg.send_info[0].send_addr;
send_len = req->msg.send_info[0].send_len;
use_tx_cq_blk = (ep->domain->data_progress == FI_PROGRESS_AUTO);
GNIX_DEBUG(FI_LOG_EP_DATA, "send_cnt = %lu, recv_cnt = %lu\n",
send_cnt, recv_cnt);
/* Ensure the user's recv buffer is registered for recv/recvv */
if (!req->msg.recv_md[0]) {
struct fid_mr *auto_mr;
for (recv_idx = 0; recv_idx < recv_cnt; recv_idx++) {
auto_mr = NULL;
ret = _gnix_mr_reg(&ep->domain->domain_fid.fid,
(void *)
req->msg.recv_info[recv_idx].recv_addr,
req->msg.recv_info[recv_idx].recv_len,
FI_READ | FI_WRITE, 0, 0, 0,
&auto_mr, NULL, ep->auth_key, GNIX_PROV_REG);
if (ret != FI_SUCCESS) {
GNIX_DEBUG(FI_LOG_EP_DATA,
"Failed to auto-register"
" local buffer: %s\n",
fi_strerror(-ret));
for (recv_idx--; recv_idx >= 0; recv_idx--)
fi_close(&req->msg.recv_md[recv_idx]->mr_fid.fid);
return ret;
}
req->msg.recv_md[recv_idx] = container_of(
(void *) auto_mr,
struct gnix_fid_mem_desc,
mr_fid);
req->msg.recv_info[recv_idx].mem_hndl =
req->msg.recv_md[recv_idx]->mem_hndl;
GNIX_DEBUG(FI_LOG_EP_DATA, "auto-reg MR: %p\n",
req->msg.recv_md[recv_idx]);
}
req->msg.recv_flags |= FI_LOCAL_MR;
}
recv_idx = send_idx = 0;
/* Iterate through the buffers and build the Fma and Rdma requests! */
while (send_idx < send_cnt) {
get_len = MIN(recv_len, send_len);
/* Begin alignment checks
*
* Each "mid-head" and "mid-tail" (resulting from the send pointer
* and length being adjusted to match the smaller posted recv buf in
* this loop) will be added to one or more chained transactions
* below.
*
* The original heads and tails (sent across in the control
* message) must be accounted for below in order to GET the
* correct, now four byte aligned, "body" section of the
* message.
*/
if (send_ptr & GNI_READ_ALIGN_MASK ||
(send_ptr + get_len) & GNI_READ_ALIGN_MASK) {
if (req->int_tx_buf_e == NULL) {
req->int_tx_buf_e = _gnix_ep_get_int_tx_buf(ep);
/* There are no available int_tx bufs */
if (req->int_tx_buf_e == NULL) {
ofi_atomic_set32(&req->msg.outstanding_txds, 0);
req->work_fn = __gnix_rndzv_iov_req_post;
return _gnix_vc_queue_work_req(req);
}
req->int_tx_buf = ((struct gnix_int_tx_buf *)
req->int_tx_buf_e)->buf;
req->int_tx_mdh = _gnix_ep_get_int_tx_mdh(
req->int_tx_buf_e);
GNIX_DEBUG(FI_LOG_EP_DATA,
"req->int_tx_buf = %p\n", req->int_tx_buf);
}
head_off = send_ptr & GNI_READ_ALIGN_MASK;
head_len = head_off ? GNI_READ_ALIGN - head_off : 0;
tail_len = (send_ptr + get_len) & GNI_READ_ALIGN_MASK;
ht_len = (size_t) (head_len + tail_len);
/* TODO: handle this. */
if (ht_len > recv_len) {
GNIX_FATAL(FI_LOG_EP_DATA, "The head tail data "
"length exceeds the matching receive"
"buffer length.\n");
}
/* found a mid-head? (see "Begin alignment" comment block) */
req->msg.recv_info[recv_idx].head_len = (send_ptr != req->msg.send_info[send_idx].send_addr) ?
head_len : 0;
/* found a mid-tail? (see "Begin alignment" comment block) */
req->msg.recv_info[recv_idx].tail_len = (send_len > recv_len) ?
tail_len : 0;
/* Update the local and remote addresses */
get_len -= ht_len;
send_len -= ht_len;
recv_len -= ht_len;
send_ptr += head_len;
recv_ptr += head_len;
/* Add to existing ct */
if (ct_txd) {
if (req->msg.recv_info[recv_idx].tail_len) {
cur_ct = *next_ct = malloc(sizeof(gni_ct_get_post_descriptor_t));
if (cur_ct == NULL) {
GNIX_DEBUG(FI_LOG_EP_DATA,
"Failed to allocate "
"gni FMA get chained "
"descriptor.");
/* +1 to ensure we free the
* current chained txd */
__gnix_msg_free_iov_txds(req, txd_cnt + 1);
return -FI_ENOSPC;
}
cur_ct->ep_hndl = gni_ep;
cur_ct->remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
cur_ct->local_mem_hndl = req->int_tx_mdh;
cur_ct->length = GNI_READ_ALIGN;
cur_ct->remote_addr = (send_ptr + get_len + tail_len) & ~GNI_READ_ALIGN_MASK;
cur_ct->local_addr = (uint64_t) (((uint8_t *) req->int_tx_buf) + (GNI_READ_ALIGN * recv_idx));
next_ct = &cur_ct->next_descr;
}
if (req->msg.recv_info[recv_idx].head_len) {
cur_ct = *next_ct = malloc(sizeof(gni_ct_get_post_descriptor_t));
if (cur_ct == NULL) {
GNIX_DEBUG(FI_LOG_EP_DATA,
"Failed to allocate "
"gni FMA get chained "
"descriptor.");
/* +1 to ensure we free the
* current chained txd */
__gnix_msg_free_iov_txds(req, txd_cnt + 1);
return -FI_ENOSPC;
}
cur_ct->ep_hndl = gni_ep;
cur_ct->remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
cur_ct->local_mem_hndl = req->int_tx_mdh;
cur_ct->length = GNI_READ_ALIGN;
cur_ct->remote_addr = send_ptr - GNI_READ_ALIGN;
cur_ct->local_addr = (uint64_t) (((uint8_t *) req->int_tx_buf) +
(GNI_READ_ALIGN * (recv_idx + GNIX_MAX_MSG_IOV_LIMIT)));
next_ct = &cur_ct->next_descr;
}
} else { /* Start a new ct */
if (req->msg.recv_info[recv_idx].tail_len) {
GNIX_DEBUG(FI_LOG_EP_DATA, "New FMA"
" CT\n");
ret = _gnix_nic_tx_alloc(nic, &ct_txd);
if (ret != FI_SUCCESS) {
/* We'll try again. */
GNIX_DEBUG(FI_LOG_EP_DATA,
"_gnix_nic_tx_alloc()"
" returned %s\n",
fi_strerror(-ret));
__gnix_msg_free_iov_txds(req, txd_cnt);
return -FI_ENOSPC;
}
ct_txd->completer_fn = __gnix_rndzv_iov_req_complete;
ct_txd->req = req;
ct_txd->gni_desc.type = GNI_POST_FMA_GET;
ct_txd->gni_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
ct_txd->gni_desc.dlvr_mode = GNI_DLVMODE_PERFORMANCE;
ct_txd->gni_desc.rdma_mode = 0;
ct_txd->gni_desc.src_cq_hndl = (use_tx_cq_blk) ? nic->tx_cq_blk : nic->tx_cq;
ct_txd->gni_desc.remote_addr = (send_ptr + get_len + tail_len) & ~GNI_READ_ALIGN_MASK;
ct_txd->gni_desc.remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
ct_txd->gni_desc.local_addr = (uint64_t) ((uint8_t *) req->int_tx_buf + (GNI_READ_ALIGN * recv_idx));
ct_txd->gni_desc.local_mem_hndl = req->int_tx_mdh;
ct_txd->gni_desc.length = GNI_READ_ALIGN;
next_ct = &ct_txd->gni_desc.next_descr;
}
if (req->msg.recv_info[recv_idx].head_len) {
if (req->msg.recv_info[recv_idx].tail_len) { /* Existing FMA CT */
cur_ct = *next_ct = malloc(sizeof(gni_ct_get_post_descriptor_t));
if (cur_ct == NULL) {
GNIX_DEBUG(FI_LOG_EP_DATA,
"Failed to allocate "
"gni FMA get chained "
"descriptor.");
/* +1 to ensure we free the
* current chained txd */
__gnix_msg_free_iov_txds(req, txd_cnt + 1);
return -FI_ENOSPC;
}
cur_ct->ep_hndl = gni_ep;
cur_ct->remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
cur_ct->local_mem_hndl = req->int_tx_mdh;
cur_ct->length = GNI_READ_ALIGN;
cur_ct->remote_addr = send_ptr - GNI_READ_ALIGN;
cur_ct->local_addr = (uint64_t) (((uint8_t *) req->int_tx_buf) +
(GNI_READ_ALIGN * (recv_idx + GNIX_MAX_MSG_IOV_LIMIT)));
next_ct = &cur_ct->next_descr;
} else { /* New FMA ct */
GNIX_DEBUG(FI_LOG_EP_DATA, "New FMA"
" CT\n");
ret = _gnix_nic_tx_alloc(nic, &ct_txd);
if (ret != FI_SUCCESS) {
/* We'll try again. */
GNIX_DEBUG(FI_LOG_EP_DATA,
"_gnix_nic_tx_alloc()"
" returned %s\n",
fi_strerror(-ret));
__gnix_msg_free_iov_txds(req, txd_cnt);
return -FI_ENOSPC;
}
ct_txd->completer_fn = __gnix_rndzv_iov_req_complete;
ct_txd->req = req;
ct_txd->gni_desc.type = GNI_POST_FMA_GET;
ct_txd->gni_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
ct_txd->gni_desc.dlvr_mode = GNI_DLVMODE_PERFORMANCE;
ct_txd->gni_desc.rdma_mode = 0;
ct_txd->gni_desc.src_cq_hndl = (use_tx_cq_blk) ? nic->tx_cq_blk : nic->tx_cq;
ct_txd->gni_desc.remote_addr = send_ptr - GNI_READ_ALIGN;
ct_txd->gni_desc.remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
ct_txd->gni_desc.local_addr = (uint64_t) ((uint8_t *) req->int_tx_buf +
(GNI_READ_ALIGN * (recv_idx + GNIX_MAX_MSG_IOV_LIMIT)));
ct_txd->gni_desc.local_mem_hndl = req->int_tx_mdh;
ct_txd->gni_desc.length = GNI_READ_ALIGN;
next_ct = &ct_txd->gni_desc.next_descr;
}
}
}
} else { /* no head/tail found */
head_len = tail_len = 0;
req->msg.recv_info[recv_idx].head_len = req->msg.recv_info[recv_idx].tail_len = 0;
}
/* End alignment checks */
GNIX_DEBUG(FI_LOG_EP_DATA, "send_info[%d].send_len = %lu,"
" recv_len = %lu, get_len = %lu, head_len = %d,"
" tail_len = %d, req->msg.recv_info[%d].tail_len = %u\n"
"req->msg.recv_info[%d].head_len = %u, "
"recv_ptr(head) = %p, recv_ptr(tail) = %p\n", send_idx,
send_len, recv_len, get_len, head_len, tail_len, recv_idx,
req->msg.recv_info[recv_idx].tail_len, recv_idx,
req->msg.recv_info[recv_idx].head_len, (void *) (recv_ptr - head_len),
(void *) (recv_ptr + get_len));
GNIX_DEBUG(FI_LOG_EP_DATA, "txd = %p, send_ptr = %p, "
"send_ptr + get_len = %p, recv_ptr = %p\n",
txd, (void *) send_ptr, (void *)(send_ptr + get_len),
recv_ptr);
if (get_len >= rndzv_thresh) { /* Build the rdma txd */
ret = _gnix_nic_tx_alloc(nic, &txd);
if (ret != FI_SUCCESS) {
/* We'll try again. */
GNIX_DEBUG(FI_LOG_EP_DATA, "_gnix_nic_tx_alloc()"
" returned %s\n",
fi_strerror(-ret));
__gnix_msg_free_iov_txds(req, txd_cnt);
return -FI_ENOSPC;
}
txd->completer_fn = __gnix_rndzv_iov_req_complete;
txd->req = req;
txd->gni_desc.type = GNI_POST_RDMA_GET;
txd->gni_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
txd->gni_desc.dlvr_mode = GNI_DLVMODE_PERFORMANCE;
txd->gni_desc.local_mem_hndl = req->msg.recv_info[recv_idx].mem_hndl;
txd->gni_desc.remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
txd->gni_desc.rdma_mode = 0;
txd->gni_desc.src_cq_hndl = (use_tx_cq_blk) ? nic->tx_cq_blk : nic->tx_cq;
txd->gni_desc.local_addr = recv_ptr;
txd->gni_desc.remote_addr = send_ptr;
txd->gni_desc.length = get_len;
req->iov_txds[txd_cnt++] = txd;
txd = NULL;
} else if (get_len) { /* Build the Ct txd */
if (!ct_txd) {
GNIX_DEBUG(FI_LOG_EP_DATA, "New FMA"
" CT\n");
ret = _gnix_nic_tx_alloc(nic, &ct_txd);
if (ret != FI_SUCCESS) {
/* We'll try again. */
GNIX_DEBUG(FI_LOG_EP_DATA,
"_gnix_nic_tx_alloc()"
" returned %s\n",
fi_strerror(-ret));
__gnix_msg_free_iov_txds(req, txd_cnt);
return -FI_ENOSPC;
}
ct_txd->completer_fn = __gnix_rndzv_iov_req_complete;
ct_txd->req = req;
ct_txd->gni_desc.type = GNI_POST_FMA_GET;
ct_txd->gni_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
ct_txd->gni_desc.dlvr_mode = GNI_DLVMODE_PERFORMANCE;
ct_txd->gni_desc.local_mem_hndl = req->msg.recv_info[recv_idx]. mem_hndl;
ct_txd->gni_desc.remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
ct_txd->gni_desc.rdma_mode = 0;
ct_txd->gni_desc.src_cq_hndl = (use_tx_cq_blk) ? nic->tx_cq_blk : nic->tx_cq;
ct_txd->gni_desc.local_addr = recv_ptr;
ct_txd->gni_desc.remote_addr = send_ptr;
ct_txd->gni_desc.length = get_len;
next_ct = &ct_txd->gni_desc.next_descr;
} else {
cur_ct = *next_ct = malloc(sizeof(gni_ct_get_post_descriptor_t));
if (cur_ct == NULL) {
GNIX_DEBUG(FI_LOG_EP_DATA,
"Failed to allocate "
"gni FMA get chained "
"descriptor.");
/* +1 to ensure we free the
* current chained txd */
__gnix_msg_free_iov_txds(req, txd_cnt + 1);
return -FI_ENOSPC;
}
cur_ct->ep_hndl = gni_ep;
cur_ct->length = get_len;
cur_ct->remote_addr = send_ptr;
cur_ct->remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
cur_ct->local_addr = (uint64_t) recv_ptr;
cur_ct->local_mem_hndl = req->msg.recv_info[recv_idx].mem_hndl;
next_ct = &cur_ct->next_descr;
}
}
/* Update the recv len */
recv_len -= get_len;
/* We have exhausted the current recv (and possibly send)
* buffer */
if (recv_len == 0) {
recv_idx++;
/* We cannot receive any more. */
if (recv_idx == recv_cnt)
break;
recv_ptr = req->msg.recv_info[recv_idx].recv_addr;
recv_len = req->msg.recv_info[recv_idx].recv_len;
/* Also exhausted send buffer */
if (get_len == send_len) {
send_idx++;
send_ptr = req->msg.send_info[send_idx].send_addr;
send_len = req->msg.send_info[send_idx].send_len;
} else {
send_ptr += (get_len + tail_len);
send_len -= get_len;
}
} else { /* Just exhausted current send buffer. */
send_idx++;
send_ptr = req->msg.send_info[send_idx].send_addr;
send_len = req->msg.send_info[send_idx].send_len;
recv_ptr += (get_len + tail_len);
}
GNIX_DEBUG(FI_LOG_EP_DATA, "send_idx = %d, recv_idx = %d\n",
send_idx, recv_idx);
}
/*
* If we ran out of buffer space on the sender's/receiver's side in the
* middle of building the ct, we must terminate and add that ct to the
* queue. Note that if the last txd built was a rdma txd then the txd
* will have been queued and txd will have a NULL value.
*/
if (ct_txd) {
*next_ct = NULL;
req->iov_txds[txd_cnt++] = ct_txd;
}
GNIX_DEBUG(FI_LOG_EP_DATA, "txd_cnt = %lu\n", txd_cnt);
ofi_atomic_set32(&req->msg.outstanding_txds, txd_cnt);
/* All the txd's are built, update the work_fn */
req->work_fn = __gnix_rndzv_iov_req_post;
/* Put this request back on work Q.
* TODO: Should we put it at the beginning of the work Q? */
ret = _gnix_vc_queue_work_req(req);
return ret;
}