ssize_t _gnix_recv()

in prov/gni/src/gnix_msg.c [2627:2849]


ssize_t _gnix_recv(struct gnix_fid_ep *ep, uint64_t buf, size_t len,
		   void *mdesc, uint64_t src_addr, void *context,
		   uint64_t flags, uint64_t tag, uint64_t ignore,
		   struct gnix_fab_req *mrecv_req)
{
	int ret = FI_SUCCESS;
	struct gnix_fab_req *req = NULL;
	struct gnix_address gnix_addr;
	struct gnix_tag_storage *posted_queue = NULL;
	struct gnix_tag_storage *unexp_queue = NULL;
	uint64_t match_flags;
	struct gnix_fid_mem_desc *md = NULL;
	int tagged = !!(flags & FI_TAGGED);

	if (!ep->recv_cq && !ep->recv_cntr) {
		return -FI_ENOCQ;
	}

	if (!tagged) {
		if (!ep->ep_ops.msg_recv_allowed)
			return -FI_EOPNOTSUPP;
	} else {
		if (!ep->ep_ops.tagged_recv_allowed)
			return -FI_EOPNOTSUPP;
	}

	ret = __gnix_msg_addr_lookup(ep, src_addr, &gnix_addr);
	if (ret != FI_SUCCESS)
		return ret;

	match_flags = flags & (FI_CLAIM | FI_DISCARD | FI_PEEK);

	__gnix_msg_queues(ep, tagged, &posted_queue, &unexp_queue);

	GNIX_DEBUG(FI_LOG_EP_DATA, "posted_queue = %p\n", posted_queue);

	if (!tagged) {
		tag = 0;
		ignore = ~0;
	}

	COND_ACQUIRE(ep->requires_lock, &ep->vc_lock);

	/* Look for a matching unexpected receive request. */
	req = _gnix_match_tag(unexp_queue, tag, ignore,
			      match_flags, context, &gnix_addr);
	if (req) {
		/*
		 * if we posted a multi-recv buffer and we can't
		 * hold the matched message, stop dequeuing and
		 * return.
		 */
		if (OFI_UNLIKELY(mrecv_req != NULL)) {

			mrecv_req->msg.mrecv_space_left -=
				req->msg.cum_send_len;
			mrecv_req->msg.mrecv_buf_addr +=
				req->msg.cum_send_len;
			req->msg.parent = mrecv_req;
			_gnix_ref_get(mrecv_req);
		}

		/* Found matching request, populate local fields. */

		req->gnix_ep = ep;
		req->user_context = context;
		req->msg.recv_info[0].recv_addr = (uint64_t)buf;
		req->msg.recv_info[0].recv_len = len;
		req->msg.cum_recv_len = len;

		if (mdesc) {
			md = container_of(mdesc,
					  struct gnix_fid_mem_desc,
					  mr_fid);
			req->msg.recv_info[0].mem_hndl = md->mem_hndl;
		}
		req->msg.recv_md[0] = md;
		req->msg.recv_iov_cnt = 1;
		req->msg.recv_flags = flags;
		req->msg.ignore = ignore;

		if ((flags & GNIX_SUPPRESS_COMPLETION) ||
		    (ep->recv_selective_completion &&
		    !(flags & FI_COMPLETION))) {
			req->msg.recv_flags &= ~FI_COMPLETION;
		} else {
			req->msg.recv_flags |= FI_COMPLETION;
		}

		/* Check to see if we are using P/C/D matching flags. */
		if (match_flags & FI_DISCARD) {
			ret = __gnix_discard_request(req);
			goto pdc_exit;
		} else if (match_flags & FI_PEEK) {
			ret = __gnix_peek_request(req);
			goto pdc_exit;
		}

		if (req->msg.send_flags & GNIX_MSG_RENDEZVOUS) {
			/* Matched rendezvous request.  Start data movement. */
			GNIX_DEBUG(FI_LOG_EP_DATA, "matched RNDZV, req: %p\n",
				  req);

			/*
			 * this shouldn't happen
			 */
			if (OFI_UNLIKELY(req->vc == NULL)) {
				GNIX_ERR(FI_LOG_EP_DATA,
					 "fab req vc field NULL");
			}

			/* Check if second GET for unaligned data is needed. */
			if (req->msg.send_info[0].send_len > req->msg.recv_info[0].recv_len &&
			    ((req->msg.send_info[0].send_addr + req->msg.recv_info[0].recv_len) &
			     GNI_READ_ALIGN_MASK)) {
				req->msg.recv_flags |= GNIX_MSG_GET_TAIL;
			}

			/* Initiate pull of source data. */
			req->work_fn = req->msg.send_iov_cnt == 1 ?
				__gnix_rndzv_req : __gnix_rndzv_iov_req_build;

			_gnix_remove_tag(unexp_queue, req);
			ret = _gnix_vc_queue_work_req(req);

		} else {
			/* Matched eager request.  Copy data and generate
			 * completions. */
			GNIX_DEBUG(FI_LOG_EP_DATA, "Matched recv, req: %p\n",
				  req);

			req->msg.cum_send_len = req->msg.send_info[0].send_len;

			/* Send length is truncated to receive buffer size. */
			req->msg.send_info[0].send_len =
				MIN(req->msg.send_info[0].send_len,
				    req->msg.recv_info[0].recv_len);

			/* Copy data from unexpected eager receive buffer. */
			memcpy((void *)buf, (void *)req->msg.send_info[0].send_addr,
			       req->msg.send_info[0].send_len);
			free((void *)req->msg.send_info[0].send_addr);

			_gnix_remove_tag(unexp_queue, req);
			__gnix_msg_recv_completion(ep, req);

			_gnix_fr_free(ep, req);
		}
	} else {

		/*
		 * if handling a multi receive request,
		 * just return
		 */
		if (mrecv_req)
			goto mrecv_exit;

		/* if peek/claim/discard, we didn't find what we
		 * were looking for, return FI_ENOMSG
		 */
		if (match_flags) {
			__recv_err(ep, context, flags, len,
				   (void *)buf, 0, tag, len, FI_ENOMSG,
				   FI_ENOMSG, NULL, 0);

			/* if handling trecvmsg flags, return here
			 * Never post a receive request from this type of context
			 */
			ret = FI_SUCCESS;
			goto pdc_exit;
		}

		req = _gnix_fr_alloc(ep);
		if (req == NULL) {
			ret = -FI_EAGAIN;
			goto err;
		}

		GNIX_DEBUG(FI_LOG_EP_DATA, "New recv, req: %p\n", req);

		req->type = GNIX_FAB_RQ_RECV;

		req->addr = gnix_addr;
		req->gnix_ep = ep;
		req->user_context = context;

		req->msg.recv_info[0].recv_addr = (uint64_t)buf;
		req->msg.recv_info[0].recv_len = len;
		req->msg.cum_recv_len = len;

		if (mdesc) {
			md = container_of(mdesc,
					struct gnix_fid_mem_desc,
					mr_fid);

			req->msg.recv_info[0].mem_hndl = md->mem_hndl;
		}

		req->msg.recv_md[0] = md;
		req->msg.send_iov_cnt = req->msg.recv_iov_cnt = 1;
		req->msg.recv_flags = flags;
		req->msg.tag = tag;
		req->msg.ignore = ignore;
		req->msg.parent = NULL;
		ofi_atomic_initialize32(&req->msg.outstanding_txds, 0);

		if ((flags & GNIX_SUPPRESS_COMPLETION) ||
		    (ep->recv_selective_completion &&
		    !(flags & FI_COMPLETION))) {
			req->msg.recv_flags &= ~FI_COMPLETION;
		} else {
			req->msg.recv_flags |= FI_COMPLETION;
		}
		_gnix_insert_tag(posted_queue, tag, req, ignore);
	}

mrecv_exit:
pdc_exit:
err:
	COND_RELEASE(ep->requires_lock, &ep->vc_lock);

	return ret;
}