in prov/gni/src/gnix_msg.c [3303:3560]
ssize_t _gnix_recvv(struct gnix_fid_ep *ep, const struct iovec *iov,
void **desc, size_t count, uint64_t src_addr, void *context,
uint64_t flags, uint64_t ignore, uint64_t tag)
{
int i, ret = FI_SUCCESS;
size_t cum_len = 0;
struct gnix_fab_req *req = NULL;
struct gnix_address gnix_addr;
struct gnix_tag_storage *posted_queue = NULL;
struct gnix_tag_storage *unexp_queue = NULL;
uint64_t match_flags;
int tagged = flags & FI_TAGGED;
if (!ep->recv_cq && !ep->recv_cntr) {
return -FI_ENOCQ;
}
if (!tagged) {
if (!ep->ep_ops.msg_send_allowed)
return -FI_EOPNOTSUPP;
tag = 0;
ignore = ~0;
} else {
if (!ep->ep_ops.tagged_send_allowed)
return -FI_EOPNOTSUPP;
}
match_flags = flags & (FI_CLAIM | FI_DISCARD | FI_PEEK);
/*
* Lookup the gni addr in the av_table or av_hashtable.
* If the gni addr doesn't exist the addr is FI_ADDR_UNSPEC,
* meaning this remote node wants to receive from all senders?
*/
ret = __gnix_msg_addr_lookup(ep, src_addr, &gnix_addr);
if (ret != FI_SUCCESS)
return ret;
/* calculate cumulative size of the iovec buf lens */
for (i = 0; i < count; i++) {
cum_len += iov[i].iov_len;
}
/*
* Initialize the tag storage objects.
* The posted_queue holds information about receives that have
* been posted on the remote endpoint.
*
* The unexp_queue holds information about data that has arrived
* prior to posting a receive on the remote endpoint.
*
* Both {unexp,posted}_queue objects have two sets, one for tagged
* messages and the other for untagged messages.
*
* The untagged queues match based off the source address.
*
* The tagged queues match based off the tag and source address (when
* the ep is created with FI_DIRECTED_RECV).
*
* A "message" is added to the unexpected queue when it arrives at a
* remote endpoint and the completer_fn doesn't find an existing request
* in the posted queue (i.e. no fi_recvs have been called (or posted)
* on the remote endpoint).
*/
__gnix_msg_queues(ep, tagged, &posted_queue, &unexp_queue);
COND_ACQUIRE(ep->requires_lock, &ep->vc_lock);
/*
* Posting a recv, look for an existing request in the
* unexpected queue.
*/
req = _gnix_match_tag(unexp_queue, tag, ignore,
match_flags, context, &gnix_addr);
if (req) {
GNIX_DEBUG(FI_LOG_EP_DATA, "UNEXPECTED, req = %p\n", req);
/* Found a matching request in the unexpected queue. */
/*
* reset ep, it might be different than the ep the message came
* in on.
*/
req->gnix_ep = ep;
req->user_context = context;
req->flags = 0;
req->msg.recv_flags = flags;
req->msg.recv_iov_cnt = count;
req->msg.cum_recv_len = cum_len;
/* req->msg.cum_send_len = MIN(req->msg.cum_send_len, cum_len); */
if (tagged) {
req->type = GNIX_FAB_RQ_TRECVV;
} else {
req->type = GNIX_FAB_RQ_RECVV;
}
if ((flags & GNIX_SUPPRESS_COMPLETION) ||
(ep->recv_selective_completion &&
!(flags & FI_COMPLETION))) {
req->msg.recv_flags &= ~FI_COMPLETION;
} else {
req->msg.recv_flags |= FI_COMPLETION;
}
/* Check to see if we are using P/C/D matching flags. */
if (match_flags & FI_DISCARD) {
ret = __gnix_discard_request(req);
goto pdc_exit;
} else if (match_flags & FI_PEEK) {
ret = __gnix_peek_request(req);
goto pdc_exit;
}
for (i = 0; i < count; i++) {
req->msg.recv_info[i].recv_addr = (uint64_t) iov[i].iov_base;
req->msg.recv_info[i].recv_len = iov[i].iov_len;
}
if (req->msg.send_flags & GNIX_MSG_RENDEZVOUS) {
req->work_fn = __gnix_rndzv_iov_req_build;
if (!(req->vc->modes & GNIX_VC_MODE_XPMEM)) {
if (!desc) {
ret = __gnix_msg_register_iov(ep,
iov,
count,
req->msg.recv_md);
if (ret != FI_SUCCESS) {
GNIX_DEBUG(FI_LOG_EP_DATA,
"Failed to "
"auto-register"
" local buffer: %s\n"
, fi_strerror(-ret));
goto err;
}
req->msg.send_flags |= FI_LOCAL_MR;
} else { /* User registered their memory */
for (i = 0; i < count; i++) {
if (!desc[i]) {
GNIX_WARN(FI_LOG_EP_DATA,
"invalid memory reg"
"istration (%p).\n",
desc[i]);
ret = -FI_EINVAL;
goto err;
}
req->msg.recv_md[i] =
container_of(desc[i],
struct gnix_fid_mem_desc,
mr_fid);
}
}
for (i = 0; i < count; i++)
req->msg.recv_info[i].mem_hndl =
req->msg.recv_md[i]->mem_hndl;
}
ret = _gnix_vc_queue_work_req(req);
} else {
/*
* This request is associate with a regular eager smsg,
* the rndzv threshold on the sender was not reached or
* exceeded.
*/
__gnix_msg_unpack_data_into_iov(req->msg.recv_info,
count,
req->msg.send_info[0].send_addr,
req->msg.send_info[0].send_len);
__gnix_msg_recv_completion(ep, req);
_gnix_fr_free(ep, req);
}
} else {
/* if peek/claim/discard, we didn't find what we
* were looking for, return FI_ENOMSG
*/
if (match_flags) {
__recv_err(ep, context, flags, cum_len,
(void *) iov, 0, tag, cum_len, FI_ENOMSG,
FI_ENOMSG, NULL, 0);
/* if handling trecvmsg flags, return here
* Never post a receive request from this type of
* context
*/
ret = FI_SUCCESS;
goto pdc_exit;
}
/*
* No matching requests found, create a new one and enqueue
* it in the posted queue.
*/
req = _gnix_fr_alloc(ep);
if (req == NULL) {
ret = -FI_EAGAIN;
goto err;
}
GNIX_DEBUG(FI_LOG_EP_DATA, "EXPECTED, req = %p\n", req);
if (tagged) {
req->type = GNIX_FAB_RQ_TRECVV;
} else {
req->type = GNIX_FAB_RQ_RECVV;
}
if ((flags & GNIX_SUPPRESS_COMPLETION) ||
(ep->recv_selective_completion &&
!(flags & FI_COMPLETION))) {
req->msg.recv_flags &= ~FI_COMPLETION;
} else {
req->msg.recv_flags |= FI_COMPLETION;
}
req->addr = gnix_addr;
req->gnix_ep = ep;
req->user_context = context;
req->flags = 0;
for (i = 0; i < count; i++) {
req->msg.recv_info[i].recv_addr = (uint64_t) iov[i].iov_base;
req->msg.recv_info[i].recv_len = iov[i].iov_len;
req->msg.recv_md[i] = NULL;
}
req->msg.recv_iov_cnt = count;
req->msg.recv_flags = flags;
req->msg.cum_recv_len = cum_len;
req->msg.tag = tag;
req->msg.ignore = ignore;
req->msg.parent = NULL;
ofi_atomic_initialize32(&req->msg.outstanding_txds, 0);
if ((flags & GNIX_SUPPRESS_COMPLETION) ||
(ep->recv_selective_completion &&
!(flags & FI_COMPLETION))) {
req->msg.recv_flags &= ~FI_COMPLETION;
} else {
req->msg.recv_flags |= FI_COMPLETION;
}
_gnix_insert_tag(posted_queue, tag, req, ignore);
}
pdc_exit:
err:
COND_RELEASE(ep->requires_lock, &ep->vc_lock);
return ret;
}