static int __gnix_rndzv_iov_req_build()

in prov/gni/src/gnix_msg.c [1325:1782]


static int __gnix_rndzv_iov_req_build(void *arg)
{
	int ret = FI_SUCCESS, send_idx, recv_idx, use_tx_cq_blk;
	struct gnix_fab_req *req = (struct gnix_fab_req *)arg;
	struct gnix_fid_ep *ep = req->gnix_ep;
	struct gnix_nic *nic = ep->nic;
	gni_ep_handle_t gni_ep = req->vc->gni_ep;
	struct gnix_tx_descriptor *txd = NULL, *ct_txd = NULL;
	size_t recv_len, send_len, get_len, send_cnt, recv_cnt, txd_cnt, ht_len;
	uint64_t recv_ptr = 0UL, send_ptr = 0UL;
	/* TODO: Should this be the sender's rndzv thresh instead? */
	size_t rndzv_thresh = ep->domain->params.msg_rendezvous_thresh;
	gni_ct_get_post_descriptor_t *cur_ct = NULL;
	void **next_ct = NULL;
	int head_off, head_len, tail_len;

	GNIX_DBG_TRACE(FI_LOG_EP_DATA, "\n");

	if (req->vc->modes & GNIX_VC_MODE_XPMEM)
		return  __gnix_rndzv_req_xpmem(req);

	txd_cnt = 0;
	send_cnt = req->msg.send_iov_cnt;

	recv_ptr = req->msg.recv_info[0].recv_addr;
	recv_len = req->msg.recv_info[0].recv_len;
	recv_cnt = req->msg.recv_iov_cnt;

	send_ptr = req->msg.send_info[0].send_addr;
	send_len = req->msg.send_info[0].send_len;

	use_tx_cq_blk = (ep->domain->data_progress == FI_PROGRESS_AUTO);

	GNIX_DEBUG(FI_LOG_EP_DATA, "send_cnt = %lu, recv_cnt = %lu\n",
		   send_cnt, recv_cnt);

	/* Ensure the user's recv buffer is registered for recv/recvv */
	if (!req->msg.recv_md[0]) {
		struct fid_mr *auto_mr;

		for (recv_idx = 0; recv_idx < recv_cnt; recv_idx++) {
			auto_mr = NULL;

			ret = _gnix_mr_reg(&ep->domain->domain_fid.fid,
					  (void *)
					  req->msg.recv_info[recv_idx].recv_addr,
					  req->msg.recv_info[recv_idx].recv_len,
					  FI_READ | FI_WRITE, 0, 0, 0,
					  &auto_mr, NULL, ep->auth_key, GNIX_PROV_REG);

			if (ret != FI_SUCCESS) {
				GNIX_DEBUG(FI_LOG_EP_DATA,
					   "Failed to auto-register"
					   " local buffer: %s\n",
					   fi_strerror(-ret));

				for (recv_idx--; recv_idx >= 0; recv_idx--)
					fi_close(&req->msg.recv_md[recv_idx]->mr_fid.fid);

				return ret;
			}

			req->msg.recv_md[recv_idx] = container_of(
				(void *) auto_mr,
				struct gnix_fid_mem_desc,
				mr_fid);

			req->msg.recv_info[recv_idx].mem_hndl =
				req->msg.recv_md[recv_idx]->mem_hndl;

			GNIX_DEBUG(FI_LOG_EP_DATA, "auto-reg MR: %p\n",
				   req->msg.recv_md[recv_idx]);

		}
		req->msg.recv_flags |= FI_LOCAL_MR;
	}

	recv_idx = send_idx = 0;

	/* Iterate through the buffers and build the Fma and Rdma requests! */
	while (send_idx < send_cnt) {
		get_len = MIN(recv_len, send_len);

		/* Begin alignment checks
		 *
		 * Each "mid-head" and "mid-tail" (resulting from the send pointer
		 * and length being adjusted to match the smaller posted recv buf in
		 * this loop) will be added to one or more chained transactions
		 * below.
		 *
		 * The original heads and tails (sent across in the control
		 * message) must be accounted for below in order to GET the
		 * correct, now four byte aligned, "body" section of the
		 * message.
		 */
		if (send_ptr & GNI_READ_ALIGN_MASK ||
		    (send_ptr + get_len) & GNI_READ_ALIGN_MASK) {
			if (req->int_tx_buf_e == NULL) {
				req->int_tx_buf_e = _gnix_ep_get_int_tx_buf(ep);

				/* There are no available int_tx bufs */
				if (req->int_tx_buf_e == NULL) {
					ofi_atomic_set32(&req->msg.outstanding_txds, 0);
					req->work_fn = __gnix_rndzv_iov_req_post;
					return _gnix_vc_queue_work_req(req);
				}

				req->int_tx_buf = ((struct gnix_int_tx_buf *)
						req->int_tx_buf_e)->buf;
				req->int_tx_mdh = _gnix_ep_get_int_tx_mdh(
						req->int_tx_buf_e);
				GNIX_DEBUG(FI_LOG_EP_DATA,
				    "req->int_tx_buf = %p\n", req->int_tx_buf);
			}

			head_off = send_ptr & GNI_READ_ALIGN_MASK;
			head_len = head_off ? GNI_READ_ALIGN - head_off : 0;
			tail_len = (send_ptr + get_len) & GNI_READ_ALIGN_MASK;

			ht_len = (size_t) (head_len + tail_len);

			/* TODO: handle this. */
			if (ht_len > recv_len) {
				GNIX_FATAL(FI_LOG_EP_DATA, "The head tail data "
					   "length exceeds the matching receive"
					   "buffer length.\n");
			}

			/* found a mid-head? (see "Begin alignment" comment block) */
			req->msg.recv_info[recv_idx].head_len = (send_ptr != req->msg.send_info[send_idx].send_addr) ?
				head_len : 0;

			/* found a mid-tail? (see "Begin alignment" comment block) */
			req->msg.recv_info[recv_idx].tail_len = (send_len > recv_len) ?
				tail_len : 0;

			/* Update the local and remote addresses */
			get_len -= ht_len;
			send_len -= ht_len;
			recv_len -= ht_len;

			send_ptr += head_len;
			recv_ptr += head_len;

			/* Add to existing ct */
			if (ct_txd) {
				if (req->msg.recv_info[recv_idx].tail_len) {
					cur_ct = *next_ct = malloc(sizeof(gni_ct_get_post_descriptor_t));

					if (cur_ct == NULL) {
						GNIX_DEBUG(FI_LOG_EP_DATA,
							   "Failed to allocate "
							   "gni FMA get chained "
							   "descriptor.");

						/* +1 to ensure we free the
						 * current chained txd */
						__gnix_msg_free_iov_txds(req, txd_cnt + 1);
						return -FI_ENOSPC;
					}

					cur_ct->ep_hndl = gni_ep;
					cur_ct->remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
					cur_ct->local_mem_hndl = req->int_tx_mdh;
					cur_ct->length = GNI_READ_ALIGN;
					cur_ct->remote_addr = (send_ptr + get_len + tail_len) & ~GNI_READ_ALIGN_MASK;
					cur_ct->local_addr = (uint64_t) (((uint8_t *) req->int_tx_buf) + (GNI_READ_ALIGN * recv_idx));
					next_ct = &cur_ct->next_descr;
				}

				if (req->msg.recv_info[recv_idx].head_len) {
					cur_ct = *next_ct = malloc(sizeof(gni_ct_get_post_descriptor_t));

					if (cur_ct == NULL) {
						GNIX_DEBUG(FI_LOG_EP_DATA,
							   "Failed to allocate "
							   "gni FMA get chained "
							   "descriptor.");

						/* +1 to ensure we free the
						 * current chained txd */
						__gnix_msg_free_iov_txds(req, txd_cnt + 1);
						return -FI_ENOSPC;
					}

					cur_ct->ep_hndl = gni_ep;
					cur_ct->remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
					cur_ct->local_mem_hndl = req->int_tx_mdh;
					cur_ct->length = GNI_READ_ALIGN;
					cur_ct->remote_addr = send_ptr - GNI_READ_ALIGN;
					cur_ct->local_addr = (uint64_t) (((uint8_t *) req->int_tx_buf) +
						(GNI_READ_ALIGN * (recv_idx + GNIX_MAX_MSG_IOV_LIMIT)));
					next_ct = &cur_ct->next_descr;
				}
			} else { 	/* Start a new ct */
				if (req->msg.recv_info[recv_idx].tail_len) {
					GNIX_DEBUG(FI_LOG_EP_DATA, "New FMA"
						   " CT\n");
					ret = _gnix_nic_tx_alloc(nic, &ct_txd);

					if (ret != FI_SUCCESS) {
						/* We'll try again. */
						GNIX_DEBUG(FI_LOG_EP_DATA,
							  "_gnix_nic_tx_alloc()"
							  " returned %s\n",
							  fi_strerror(-ret));

						__gnix_msg_free_iov_txds(req, txd_cnt);
						return -FI_ENOSPC;
					}

					ct_txd->completer_fn = __gnix_rndzv_iov_req_complete;
					ct_txd->req = req;

					ct_txd->gni_desc.type = GNI_POST_FMA_GET;
					ct_txd->gni_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
					ct_txd->gni_desc.dlvr_mode = GNI_DLVMODE_PERFORMANCE;
					ct_txd->gni_desc.rdma_mode = 0;
					ct_txd->gni_desc.src_cq_hndl = (use_tx_cq_blk) ? nic->tx_cq_blk : nic->tx_cq;

					ct_txd->gni_desc.remote_addr = (send_ptr + get_len + tail_len) & ~GNI_READ_ALIGN_MASK;
					ct_txd->gni_desc.remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;

					ct_txd->gni_desc.local_addr = (uint64_t) ((uint8_t *) req->int_tx_buf + (GNI_READ_ALIGN * recv_idx));
					ct_txd->gni_desc.local_mem_hndl = req->int_tx_mdh;

					ct_txd->gni_desc.length = GNI_READ_ALIGN;

					next_ct = &ct_txd->gni_desc.next_descr;
				}

				if (req->msg.recv_info[recv_idx].head_len) {
					if (req->msg.recv_info[recv_idx].tail_len) { /* Existing FMA CT */
						cur_ct = *next_ct = malloc(sizeof(gni_ct_get_post_descriptor_t));
						if (cur_ct == NULL) {
							GNIX_DEBUG(FI_LOG_EP_DATA,
								   "Failed to allocate "
								   "gni FMA get chained "
								   "descriptor.");

							/* +1 to ensure we free the
							 * current chained txd */
							__gnix_msg_free_iov_txds(req, txd_cnt + 1);
							return -FI_ENOSPC;
						}

						cur_ct->ep_hndl = gni_ep;
						cur_ct->remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
						cur_ct->local_mem_hndl = req->int_tx_mdh;
						cur_ct->length = GNI_READ_ALIGN;
						cur_ct->remote_addr = send_ptr - GNI_READ_ALIGN;
						cur_ct->local_addr = (uint64_t) (((uint8_t *) req->int_tx_buf) +
								(GNI_READ_ALIGN * (recv_idx + GNIX_MAX_MSG_IOV_LIMIT)));
						next_ct = &cur_ct->next_descr;
					} else { /* New FMA ct */
						GNIX_DEBUG(FI_LOG_EP_DATA, "New FMA"
							   " CT\n");
						ret = _gnix_nic_tx_alloc(nic, &ct_txd);

						if (ret != FI_SUCCESS) {
							/* We'll try again. */
							GNIX_DEBUG(FI_LOG_EP_DATA,
								  "_gnix_nic_tx_alloc()"
								  " returned %s\n",
								  fi_strerror(-ret));

							__gnix_msg_free_iov_txds(req, txd_cnt);
							return -FI_ENOSPC;
						}

						ct_txd->completer_fn = __gnix_rndzv_iov_req_complete;
						ct_txd->req = req;

						ct_txd->gni_desc.type = GNI_POST_FMA_GET;
						ct_txd->gni_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
						ct_txd->gni_desc.dlvr_mode = GNI_DLVMODE_PERFORMANCE;
						ct_txd->gni_desc.rdma_mode = 0;
						ct_txd->gni_desc.src_cq_hndl = (use_tx_cq_blk) ? nic->tx_cq_blk : nic->tx_cq;

						ct_txd->gni_desc.remote_addr = send_ptr - GNI_READ_ALIGN;
						ct_txd->gni_desc.remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;

						ct_txd->gni_desc.local_addr = (uint64_t) ((uint8_t *) req->int_tx_buf +
								(GNI_READ_ALIGN * (recv_idx + GNIX_MAX_MSG_IOV_LIMIT)));
						ct_txd->gni_desc.local_mem_hndl = req->int_tx_mdh;

						ct_txd->gni_desc.length = GNI_READ_ALIGN;

						next_ct = &ct_txd->gni_desc.next_descr;
					}
				}
			}
		} else { 	/* no head/tail found */
			head_len = tail_len = 0;
			req->msg.recv_info[recv_idx].head_len = req->msg.recv_info[recv_idx].tail_len = 0;
		}
		/* End alignment checks */

		GNIX_DEBUG(FI_LOG_EP_DATA, "send_info[%d].send_len = %lu,"
			   " recv_len = %lu, get_len = %lu, head_len = %d,"
			   " tail_len = %d, req->msg.recv_info[%d].tail_len = %u\n"
			   "req->msg.recv_info[%d].head_len = %u, "
			   "recv_ptr(head) = %p, recv_ptr(tail) = %p\n", send_idx,
			   send_len, recv_len, get_len, head_len, tail_len, recv_idx,
			   req->msg.recv_info[recv_idx].tail_len, recv_idx,
			   req->msg.recv_info[recv_idx].head_len, (void *) (recv_ptr - head_len),
			   (void *) (recv_ptr + get_len));

		GNIX_DEBUG(FI_LOG_EP_DATA, "txd = %p, send_ptr = %p, "
			   "send_ptr + get_len = %p, recv_ptr = %p\n",
			   txd, (void *) send_ptr, (void *)(send_ptr + get_len),
			   recv_ptr);

		if (get_len >= rndzv_thresh) { /* Build the rdma txd */
			ret = _gnix_nic_tx_alloc(nic, &txd);

			if (ret != FI_SUCCESS) {
				/* We'll try again. */
				GNIX_DEBUG(FI_LOG_EP_DATA, "_gnix_nic_tx_alloc()"
					  " returned %s\n",
					  fi_strerror(-ret));

				__gnix_msg_free_iov_txds(req, txd_cnt);
				return -FI_ENOSPC;
			}

			txd->completer_fn = __gnix_rndzv_iov_req_complete;
			txd->req = req;

			txd->gni_desc.type = GNI_POST_RDMA_GET;
			txd->gni_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
			txd->gni_desc.dlvr_mode = GNI_DLVMODE_PERFORMANCE;
			txd->gni_desc.local_mem_hndl = req->msg.recv_info[recv_idx].mem_hndl;
			txd->gni_desc.remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
			txd->gni_desc.rdma_mode = 0;
			txd->gni_desc.src_cq_hndl = (use_tx_cq_blk) ? nic->tx_cq_blk : nic->tx_cq;

			txd->gni_desc.local_addr = recv_ptr;
			txd->gni_desc.remote_addr = send_ptr;
			txd->gni_desc.length = get_len;

			req->iov_txds[txd_cnt++] = txd;
			txd = NULL;
		} else if (get_len) {		       /* Build the Ct txd */
			if (!ct_txd) {
				GNIX_DEBUG(FI_LOG_EP_DATA, "New FMA"
					   " CT\n");
				ret = _gnix_nic_tx_alloc(nic, &ct_txd);
				if (ret != FI_SUCCESS) {
					/* We'll try again. */
					GNIX_DEBUG(FI_LOG_EP_DATA,
						  "_gnix_nic_tx_alloc()"
						  " returned %s\n",
						  fi_strerror(-ret));

					__gnix_msg_free_iov_txds(req, txd_cnt);
					return -FI_ENOSPC;
				}

				ct_txd->completer_fn = __gnix_rndzv_iov_req_complete;
				ct_txd->req = req;

				ct_txd->gni_desc.type = GNI_POST_FMA_GET;
				ct_txd->gni_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
				ct_txd->gni_desc.dlvr_mode = GNI_DLVMODE_PERFORMANCE;
				ct_txd->gni_desc.local_mem_hndl = req->msg.recv_info[recv_idx]. mem_hndl;

				ct_txd->gni_desc.remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
				ct_txd->gni_desc.rdma_mode = 0;
				ct_txd->gni_desc.src_cq_hndl = (use_tx_cq_blk) ? nic->tx_cq_blk : nic->tx_cq;

				ct_txd->gni_desc.local_addr = recv_ptr;
				ct_txd->gni_desc.remote_addr = send_ptr;
				ct_txd->gni_desc.length = get_len;

				next_ct = &ct_txd->gni_desc.next_descr;
			} else {
				cur_ct = *next_ct = malloc(sizeof(gni_ct_get_post_descriptor_t));

				if (cur_ct == NULL) {
					GNIX_DEBUG(FI_LOG_EP_DATA,
						   "Failed to allocate "
						   "gni FMA get chained "
						   "descriptor.");

					/* +1 to ensure we free the
					 * current chained txd */
					__gnix_msg_free_iov_txds(req, txd_cnt + 1);
					return -FI_ENOSPC;
				}

				cur_ct->ep_hndl = gni_ep;
				cur_ct->length = get_len;
				cur_ct->remote_addr = send_ptr;
				cur_ct->remote_mem_hndl = req->msg.send_info[send_idx].mem_hndl;
				cur_ct->local_addr = (uint64_t) recv_ptr;
				cur_ct->local_mem_hndl = req->msg.recv_info[recv_idx].mem_hndl;

				next_ct = &cur_ct->next_descr;
			}
		}

		/* Update the recv len */
		recv_len -= get_len;

		/* We have exhausted the current recv (and possibly send)
		 * buffer */
		if (recv_len == 0) {
			recv_idx++;

			/* We cannot receive any more. */
			if (recv_idx == recv_cnt)
				break;

			recv_ptr = req->msg.recv_info[recv_idx].recv_addr;
			recv_len = req->msg.recv_info[recv_idx].recv_len;

			/* Also exhausted send buffer */
			if (get_len == send_len) {
				send_idx++;
				send_ptr = req->msg.send_info[send_idx].send_addr;
				send_len = req->msg.send_info[send_idx].send_len;
			} else {
				send_ptr += (get_len + tail_len);
				send_len -= get_len;
			}
		} else {	/* Just exhausted current send buffer. */
			send_idx++;
			send_ptr = req->msg.send_info[send_idx].send_addr;
			send_len = req->msg.send_info[send_idx].send_len;
			recv_ptr += (get_len + tail_len);
		}
		GNIX_DEBUG(FI_LOG_EP_DATA, "send_idx = %d, recv_idx = %d\n",
			   send_idx, recv_idx);
	}

	/*
	 * If we ran out of buffer space on the sender's/receiver's side in the
	 * middle of building the ct, we must terminate and add that ct to the
	 * queue. Note that if the last txd built was a rdma txd then the txd
	 * will have been queued and txd will have a NULL value.
	 */
	if (ct_txd) {
		*next_ct = NULL;
		req->iov_txds[txd_cnt++] = ct_txd;
	}

	GNIX_DEBUG(FI_LOG_EP_DATA, "txd_cnt = %lu\n", txd_cnt);
	ofi_atomic_set32(&req->msg.outstanding_txds, txd_cnt);

	/* All the txd's are built, update the work_fn */
	req->work_fn = __gnix_rndzv_iov_req_post;

	/* Put this request back on work Q.
	 * TODO: Should we put it at the beginning of the work Q? */
	ret = _gnix_vc_queue_work_req(req);
	return ret;
}