ssize_t _gnix_recvv()

in prov/gni/src/gnix_msg.c [3303:3560]


ssize_t _gnix_recvv(struct gnix_fid_ep *ep, const struct iovec *iov,
		    void **desc, size_t count, uint64_t src_addr, void *context,
		    uint64_t flags, uint64_t ignore, uint64_t tag)
{
	int i, ret = FI_SUCCESS;
	size_t cum_len = 0;
	struct gnix_fab_req *req = NULL;
	struct gnix_address gnix_addr;
	struct gnix_tag_storage *posted_queue = NULL;
	struct gnix_tag_storage *unexp_queue = NULL;
	uint64_t match_flags;
	int tagged = flags & FI_TAGGED;

	if (!ep->recv_cq && !ep->recv_cntr) {
		return -FI_ENOCQ;
	}

	if (!tagged) {
		if (!ep->ep_ops.msg_send_allowed)
			return -FI_EOPNOTSUPP;

		tag = 0;
		ignore = ~0;
	} else {
		if (!ep->ep_ops.tagged_send_allowed)
			return -FI_EOPNOTSUPP;
	}

	match_flags = flags & (FI_CLAIM | FI_DISCARD | FI_PEEK);

	/*
	 * Lookup the gni addr in the av_table or av_hashtable.
	 * If the gni addr doesn't exist the addr is FI_ADDR_UNSPEC,
	 * meaning this remote node wants to receive from all senders?
	 */
	ret = __gnix_msg_addr_lookup(ep, src_addr, &gnix_addr);
	if (ret != FI_SUCCESS)
		return ret;

	/* calculate cumulative size of the iovec buf lens */
	for (i = 0; i < count; i++) {
		cum_len += iov[i].iov_len;
	}

	/*
	 * Initialize the tag storage objects.
	 * The posted_queue holds information about receives that have
	 * been posted on the remote endpoint.
	 *
	 * The unexp_queue holds information about data that has arrived
	 * prior to posting a receive on the remote endpoint.
	 *
	 * Both {unexp,posted}_queue objects have two sets, one for tagged
	 * messages and the other for untagged messages.
	 *
	 * The untagged queues match based off the source address.
	 *
	 * The tagged queues match based off the tag and source address (when
	 * the ep is created with FI_DIRECTED_RECV).
	 *
	 * A "message" is added to the unexpected queue when it arrives at a
	 * remote endpoint and the completer_fn doesn't find an existing request
	 * in the posted queue (i.e. no fi_recvs have been called (or posted)
	 * on the remote endpoint).
	 */
	__gnix_msg_queues(ep, tagged, &posted_queue, &unexp_queue);

	COND_ACQUIRE(ep->requires_lock, &ep->vc_lock);

	/*
	 * Posting a recv, look for an existing request in the
	 * unexpected queue.
	 */
	req = _gnix_match_tag(unexp_queue, tag, ignore,
			      match_flags, context, &gnix_addr);

	if (req) {
		GNIX_DEBUG(FI_LOG_EP_DATA, "UNEXPECTED, req = %p\n", req);
		/* Found a matching request in the unexpected queue. */

		/*
		 * reset ep, it might be different than the ep the message came
		 * in on.
		 */
		req->gnix_ep = ep;
		req->user_context = context;
		req->flags = 0;
		req->msg.recv_flags = flags;
		req->msg.recv_iov_cnt = count;
		req->msg.cum_recv_len = cum_len;
		/* req->msg.cum_send_len = MIN(req->msg.cum_send_len, cum_len); */

		if (tagged) {
			req->type = GNIX_FAB_RQ_TRECVV;
		} else {
			req->type = GNIX_FAB_RQ_RECVV;
		}

		if ((flags & GNIX_SUPPRESS_COMPLETION) ||
		    (ep->recv_selective_completion &&
		     !(flags & FI_COMPLETION))) {
			req->msg.recv_flags &= ~FI_COMPLETION;
		} else {
			req->msg.recv_flags |= FI_COMPLETION;
		}

		/* Check to see if we are using P/C/D matching flags. */
		if (match_flags & FI_DISCARD) {
			ret = __gnix_discard_request(req);
			goto pdc_exit;
		} else if (match_flags & FI_PEEK) {
			ret = __gnix_peek_request(req);
			goto pdc_exit;
		}

		for (i = 0; i < count; i++) {
			req->msg.recv_info[i].recv_addr = (uint64_t) iov[i].iov_base;
			req->msg.recv_info[i].recv_len = iov[i].iov_len;
		}

		if (req->msg.send_flags & GNIX_MSG_RENDEZVOUS) {
			req->work_fn = __gnix_rndzv_iov_req_build;
			if (!(req->vc->modes & GNIX_VC_MODE_XPMEM)) {
				if (!desc) {
					ret = __gnix_msg_register_iov(ep,
								      iov,
								      count,
								      req->msg.recv_md);
					if (ret != FI_SUCCESS) {
						GNIX_DEBUG(FI_LOG_EP_DATA,
							   "Failed to "
							   "auto-register"
							   " local buffer: %s\n"
							   , fi_strerror(-ret));
						goto err;
					}
					req->msg.send_flags |= FI_LOCAL_MR;

				} else {	/* User registered their memory */

					for (i = 0; i < count; i++) {
						if (!desc[i]) {
							GNIX_WARN(FI_LOG_EP_DATA,
								  "invalid memory reg"
								  "istration (%p).\n",
								  desc[i]);
							ret = -FI_EINVAL;
							goto err;
						}

						req->msg.recv_md[i] =
							container_of(desc[i],
							struct gnix_fid_mem_desc,
							mr_fid);
					}
				}

				for (i = 0; i < count; i++)
					req->msg.recv_info[i].mem_hndl =
						req->msg.recv_md[i]->mem_hndl;
			}

			ret = _gnix_vc_queue_work_req(req);
		} else {

			/*
			 * This request is associate with a regular eager smsg,
			 * the rndzv threshold on the sender was not reached or
			 * exceeded.
			 */
			__gnix_msg_unpack_data_into_iov(req->msg.recv_info,
							count,
							req->msg.send_info[0].send_addr,
							req->msg.send_info[0].send_len);

			__gnix_msg_recv_completion(ep, req);
			_gnix_fr_free(ep, req);
		}
	} else {
		/* if peek/claim/discard, we didn't find what we
		 * were looking for, return FI_ENOMSG
		 */
		if (match_flags) {
			__recv_err(ep, context, flags, cum_len,
				   (void *) iov, 0, tag, cum_len, FI_ENOMSG,
				   FI_ENOMSG, NULL, 0);

			/* if handling trecvmsg flags, return here
			 * Never post a receive request from this type of
			 * context
			 */
			ret = FI_SUCCESS;
			goto pdc_exit;
		}

		/*
		 * No matching requests found, create a new one and enqueue
		 * it in the posted queue.
		 */
		req = _gnix_fr_alloc(ep);
		if (req == NULL) {
			ret = -FI_EAGAIN;
			goto err;
		}

		GNIX_DEBUG(FI_LOG_EP_DATA, "EXPECTED, req = %p\n", req);

		if (tagged) {
			req->type = GNIX_FAB_RQ_TRECVV;
		} else {
			req->type = GNIX_FAB_RQ_RECVV;
		}

		if ((flags & GNIX_SUPPRESS_COMPLETION) ||
		    (ep->recv_selective_completion &&
		     !(flags & FI_COMPLETION))) {
			req->msg.recv_flags &= ~FI_COMPLETION;
		} else {
			req->msg.recv_flags |= FI_COMPLETION;
		}

		req->addr = gnix_addr;
		req->gnix_ep = ep;
		req->user_context = context;
		req->flags = 0;

		for (i = 0; i < count; i++) {
			req->msg.recv_info[i].recv_addr = (uint64_t) iov[i].iov_base;
			req->msg.recv_info[i].recv_len = iov[i].iov_len;
			req->msg.recv_md[i] = NULL;
		}

		req->msg.recv_iov_cnt = count;
		req->msg.recv_flags = flags;
		req->msg.cum_recv_len = cum_len;
		req->msg.tag = tag;
		req->msg.ignore = ignore;
		req->msg.parent = NULL;
		ofi_atomic_initialize32(&req->msg.outstanding_txds, 0);


		if ((flags & GNIX_SUPPRESS_COMPLETION) ||
		    (ep->recv_selective_completion &&
		     !(flags & FI_COMPLETION))) {
			req->msg.recv_flags &= ~FI_COMPLETION;
		} else {
			req->msg.recv_flags |= FI_COMPLETION;
		}

		_gnix_insert_tag(posted_queue, tag, req, ignore);
	}

pdc_exit:
err:
	COND_RELEASE(ep->requires_lock, &ep->vc_lock);

	return ret;
}