in prov/gni/src/gnix_msg.c [2627:2849]
ssize_t _gnix_recv(struct gnix_fid_ep *ep, uint64_t buf, size_t len,
void *mdesc, uint64_t src_addr, void *context,
uint64_t flags, uint64_t tag, uint64_t ignore,
struct gnix_fab_req *mrecv_req)
{
int ret = FI_SUCCESS;
struct gnix_fab_req *req = NULL;
struct gnix_address gnix_addr;
struct gnix_tag_storage *posted_queue = NULL;
struct gnix_tag_storage *unexp_queue = NULL;
uint64_t match_flags;
struct gnix_fid_mem_desc *md = NULL;
int tagged = !!(flags & FI_TAGGED);
if (!ep->recv_cq && !ep->recv_cntr) {
return -FI_ENOCQ;
}
if (!tagged) {
if (!ep->ep_ops.msg_recv_allowed)
return -FI_EOPNOTSUPP;
} else {
if (!ep->ep_ops.tagged_recv_allowed)
return -FI_EOPNOTSUPP;
}
ret = __gnix_msg_addr_lookup(ep, src_addr, &gnix_addr);
if (ret != FI_SUCCESS)
return ret;
match_flags = flags & (FI_CLAIM | FI_DISCARD | FI_PEEK);
__gnix_msg_queues(ep, tagged, &posted_queue, &unexp_queue);
GNIX_DEBUG(FI_LOG_EP_DATA, "posted_queue = %p\n", posted_queue);
if (!tagged) {
tag = 0;
ignore = ~0;
}
COND_ACQUIRE(ep->requires_lock, &ep->vc_lock);
/* Look for a matching unexpected receive request. */
req = _gnix_match_tag(unexp_queue, tag, ignore,
match_flags, context, &gnix_addr);
if (req) {
/*
* if we posted a multi-recv buffer and we can't
* hold the matched message, stop dequeuing and
* return.
*/
if (OFI_UNLIKELY(mrecv_req != NULL)) {
mrecv_req->msg.mrecv_space_left -=
req->msg.cum_send_len;
mrecv_req->msg.mrecv_buf_addr +=
req->msg.cum_send_len;
req->msg.parent = mrecv_req;
_gnix_ref_get(mrecv_req);
}
/* Found matching request, populate local fields. */
req->gnix_ep = ep;
req->user_context = context;
req->msg.recv_info[0].recv_addr = (uint64_t)buf;
req->msg.recv_info[0].recv_len = len;
req->msg.cum_recv_len = len;
if (mdesc) {
md = container_of(mdesc,
struct gnix_fid_mem_desc,
mr_fid);
req->msg.recv_info[0].mem_hndl = md->mem_hndl;
}
req->msg.recv_md[0] = md;
req->msg.recv_iov_cnt = 1;
req->msg.recv_flags = flags;
req->msg.ignore = ignore;
if ((flags & GNIX_SUPPRESS_COMPLETION) ||
(ep->recv_selective_completion &&
!(flags & FI_COMPLETION))) {
req->msg.recv_flags &= ~FI_COMPLETION;
} else {
req->msg.recv_flags |= FI_COMPLETION;
}
/* Check to see if we are using P/C/D matching flags. */
if (match_flags & FI_DISCARD) {
ret = __gnix_discard_request(req);
goto pdc_exit;
} else if (match_flags & FI_PEEK) {
ret = __gnix_peek_request(req);
goto pdc_exit;
}
if (req->msg.send_flags & GNIX_MSG_RENDEZVOUS) {
/* Matched rendezvous request. Start data movement. */
GNIX_DEBUG(FI_LOG_EP_DATA, "matched RNDZV, req: %p\n",
req);
/*
* this shouldn't happen
*/
if (OFI_UNLIKELY(req->vc == NULL)) {
GNIX_ERR(FI_LOG_EP_DATA,
"fab req vc field NULL");
}
/* Check if second GET for unaligned data is needed. */
if (req->msg.send_info[0].send_len > req->msg.recv_info[0].recv_len &&
((req->msg.send_info[0].send_addr + req->msg.recv_info[0].recv_len) &
GNI_READ_ALIGN_MASK)) {
req->msg.recv_flags |= GNIX_MSG_GET_TAIL;
}
/* Initiate pull of source data. */
req->work_fn = req->msg.send_iov_cnt == 1 ?
__gnix_rndzv_req : __gnix_rndzv_iov_req_build;
_gnix_remove_tag(unexp_queue, req);
ret = _gnix_vc_queue_work_req(req);
} else {
/* Matched eager request. Copy data and generate
* completions. */
GNIX_DEBUG(FI_LOG_EP_DATA, "Matched recv, req: %p\n",
req);
req->msg.cum_send_len = req->msg.send_info[0].send_len;
/* Send length is truncated to receive buffer size. */
req->msg.send_info[0].send_len =
MIN(req->msg.send_info[0].send_len,
req->msg.recv_info[0].recv_len);
/* Copy data from unexpected eager receive buffer. */
memcpy((void *)buf, (void *)req->msg.send_info[0].send_addr,
req->msg.send_info[0].send_len);
free((void *)req->msg.send_info[0].send_addr);
_gnix_remove_tag(unexp_queue, req);
__gnix_msg_recv_completion(ep, req);
_gnix_fr_free(ep, req);
}
} else {
/*
* if handling a multi receive request,
* just return
*/
if (mrecv_req)
goto mrecv_exit;
/* if peek/claim/discard, we didn't find what we
* were looking for, return FI_ENOMSG
*/
if (match_flags) {
__recv_err(ep, context, flags, len,
(void *)buf, 0, tag, len, FI_ENOMSG,
FI_ENOMSG, NULL, 0);
/* if handling trecvmsg flags, return here
* Never post a receive request from this type of context
*/
ret = FI_SUCCESS;
goto pdc_exit;
}
req = _gnix_fr_alloc(ep);
if (req == NULL) {
ret = -FI_EAGAIN;
goto err;
}
GNIX_DEBUG(FI_LOG_EP_DATA, "New recv, req: %p\n", req);
req->type = GNIX_FAB_RQ_RECV;
req->addr = gnix_addr;
req->gnix_ep = ep;
req->user_context = context;
req->msg.recv_info[0].recv_addr = (uint64_t)buf;
req->msg.recv_info[0].recv_len = len;
req->msg.cum_recv_len = len;
if (mdesc) {
md = container_of(mdesc,
struct gnix_fid_mem_desc,
mr_fid);
req->msg.recv_info[0].mem_hndl = md->mem_hndl;
}
req->msg.recv_md[0] = md;
req->msg.send_iov_cnt = req->msg.recv_iov_cnt = 1;
req->msg.recv_flags = flags;
req->msg.tag = tag;
req->msg.ignore = ignore;
req->msg.parent = NULL;
ofi_atomic_initialize32(&req->msg.outstanding_txds, 0);
if ((flags & GNIX_SUPPRESS_COMPLETION) ||
(ep->recv_selective_completion &&
!(flags & FI_COMPLETION))) {
req->msg.recv_flags &= ~FI_COMPLETION;
} else {
req->msg.recv_flags |= FI_COMPLETION;
}
_gnix_insert_tag(posted_queue, tag, req, ignore);
}
mrecv_exit:
pdc_exit:
err:
COND_RELEASE(ep->requires_lock, &ep->vc_lock);
return ret;
}