ssize_t psmx2_writev_generic()

in prov/psm2/src/psmx2_rma.c [1068:1276]


ssize_t psmx2_writev_generic(struct fid_ep *ep, const struct iovec *iov,
			     void **desc, size_t count, fi_addr_t dest_addr,
			     uint64_t addr, uint64_t key, void *context,
			     uint64_t flags, uint64_t data)
{
	struct psmx2_fid_ep *ep_priv;
	struct psmx2_fid_av *av;
	struct psmx2_am_request *req;
	psm2_amarg_t args[8];
	int nargs;
	int am_flags = PSM2_AM_FLAG_ASYNC;
	int chunk_size;
	psm2_epaddr_t psm2_epaddr;
	psm2_epid_t psm2_epid;
	psm2_mq_req_t psm2_req;
	psm2_mq_tag_t psm2_tag;
	void *psm2_context;
	int no_event;
	size_t total_len, len, len_sent;
	uint8_t *buf, *p;
	int i;

	ep_priv = container_of(ep, struct psmx2_fid_ep, ep);

	if (flags & FI_TRIGGER)
		return psmx2_trigger_queue_writev(ep, iov, desc, count,
						  dest_addr, addr, key,
						  context, flags, data);

	av = ep_priv->av;
	assert(av);

	psm2_epaddr = psmx2_av_translate_addr(av, ep_priv->tx, dest_addr, av->type);
	psm2_epaddr_to_epid(psm2_epaddr, &psm2_epid);

	if (psm2_epid == ep_priv->tx->psm2_epid)
		return psmx2_rma_self(PSMX2_AM_REQ_WRITEV, ep_priv,
				      (void *)iov, count, desc, addr,
				      key, context, flags, data);

	no_event = (flags & PSMX2_NO_COMPLETION) ||
		   (ep_priv->send_selective_completion && !(flags & FI_COMPLETION));

	total_len = 0;
	for (i=0; i<count; i++)
		total_len += iov[i].iov_len;

	chunk_size = ep_priv->tx->psm2_am_param.max_request_short;

	req = psmx2_am_request_alloc(ep_priv->tx);
	if (!req)
		return -FI_ENOMEM;

	/* Case 1: fit into a AM message, then pack and send */
	if (total_len <= chunk_size) {
		req->tmpbuf = malloc(total_len);
		if (!req->tmpbuf) {
			psmx2_am_request_free(ep_priv->tx, req);
			return -FI_ENOMEM;
		}

		p = req->tmpbuf;
		for (i=0; i<count; i++) {
			if (iov[i].iov_len) {
				memcpy(p, iov[i].iov_base, iov[i].iov_len);
				p += iov[i].iov_len;
			}
		}
		buf = req->tmpbuf;
		len = total_len;

		req->no_event = no_event;
		req->op = PSMX2_AM_REQ_WRITE;
		req->write.buf = (void *)buf;
		req->write.len = len;
		req->write.addr = addr;	/* needed? */
		req->write.key = key; 	/* needed? */
		req->write.context = context;
		req->ep = ep_priv;
		req->cq_flags = FI_WRITE | FI_RMA;
		PSMX2_CTXT_USER(&req->fi_context) = context;
		PSMX2_CTXT_EP(&req->fi_context) = ep_priv;

		args[0].u32w0 = 0;
		PSMX2_AM_SET_OP(args[0].u32w0, PSMX2_AM_REQ_WRITE);
		args[0].u32w1 = len;
		args[1].u64 = (uint64_t)(uintptr_t)req;
		args[2].u64 = addr;
		args[3].u64 = key;
		nargs = 4;
		if (flags & FI_REMOTE_CQ_DATA) {
			PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_DATA | PSMX2_AM_EOM);
			args[4].u64 = data;
			nargs++;
		} else {
			PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_EOM);
		}
		psm2_am_request_short(psm2_epaddr, PSMX2_AM_RMA_HANDLER, args, nargs,
				      (void *)buf, len, am_flags, NULL, NULL);
		psmx2_am_poll(ep_priv->tx);
		return 0;
	}

	if (flags & FI_INJECT) {
		psmx2_am_request_free(ep_priv->tx, req);
		return -FI_EMSGSIZE;
	}

	PSMX2_CTXT_TYPE(&req->fi_context) = no_event ?
					    PSMX2_NOCOMP_WRITE_CONTEXT :
					    PSMX2_WRITE_CONTEXT;

	req->no_event = no_event;
	req->op = PSMX2_AM_REQ_WRITE;
	req->write.buf = (void *)iov[0].iov_base;
	req->write.len = total_len;
	req->write.addr = addr;	/* needed? */
	req->write.key = key; 	/* needed? */
	req->write.context = context;
	req->ep = ep_priv;
	req->cq_flags = FI_WRITE | FI_RMA;
	PSMX2_CTXT_USER(&req->fi_context) = context;
	PSMX2_CTXT_EP(&req->fi_context) = ep_priv;

	/* Case 2: send iov in sequence */
	args[0].u32w0 = 0;

	len_sent = 0;
	for (i=0; i<count; i++) {
		if (!iov[i].iov_len)
			continue;

		/* Case 2.1: use long protocol for the last segment if it is large */
		if (psmx2_env.tagged_rma && iov[i].iov_len > chunk_size &&
		    len_sent + iov[i].iov_len == total_len) {
			PSMX2_SET_TAG(psm2_tag, (uint64_t)req, 0, PSMX2_RMA_TYPE_WRITE);
			PSMX2_AM_SET_OP(args[0].u32w0, PSMX2_AM_REQ_WRITE_LONG);
			args[0].u32w1 = iov[i].iov_len;
			args[1].u64 = (uint64_t)req;
			args[2].u64 = addr;
			args[3].u64 = key;
			nargs = 4;
			if (flags & FI_REMOTE_CQ_DATA) {
				PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_DATA);
				args[4].u64 = data;
				nargs++;
			}

			if (flags & FI_DELIVERY_COMPLETE) {
				args[0].u32w0 |= PSMX2_AM_FORCE_ACK;
				psm2_context = NULL;
			} else {
				psm2_context = (void *)&req->fi_context;
			}

			psm2_am_request_short(psm2_epaddr, PSMX2_AM_RMA_HANDLER, args,
					      nargs, NULL, 0, am_flags, NULL, NULL);
			psmx2_am_poll(ep_priv->tx);

			psm2_mq_isend2(ep_priv->tx->psm2_mq, psm2_epaddr, 0,
				       &psm2_tag, iov[i].iov_base, iov[i].iov_len,
				       psm2_context, &psm2_req);

			return 0;
		}

		/* Case 2.2: use short protocol all other segments */
		PSMX2_AM_SET_OP(args[0].u32w0, PSMX2_AM_REQ_WRITE);
		nargs = 4;
		buf = iov[i].iov_base;
		len = iov[i].iov_len;
		while (len > chunk_size) {
			args[0].u32w1 = chunk_size;
			args[1].u64 = (uint64_t)(uintptr_t)req;
			args[2].u64 = addr;
			args[3].u64 = key;
			psm2_am_request_short(psm2_epaddr, PSMX2_AM_RMA_HANDLER, args,
					      nargs, (void *)buf, chunk_size, am_flags,
					      NULL, NULL);
			psmx2_am_poll(ep_priv->tx);
			buf += chunk_size;
			addr += chunk_size;
			len -= chunk_size;
			len_sent += chunk_size;
		}

		args[0].u32w1 = len;
		args[1].u64 = (uint64_t)(uintptr_t)req;
		args[2].u64 = addr;
		args[3].u64 = key;
		if (len_sent + len == total_len) {
			if (flags & FI_REMOTE_CQ_DATA) {
				PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_DATA | PSMX2_AM_EOM);
				args[4].u64 = data;
				nargs++;
			} else {
				PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_EOM);
			}
		}
		psm2_am_request_short(psm2_epaddr, PSMX2_AM_RMA_HANDLER, args, nargs,
				      (void *)buf, len, am_flags, NULL, NULL);
		psmx2_am_poll(ep_priv->tx);

		addr += len;
		len_sent += len;
	}

	return 0;
}