in prov/psm2/src/psmx2_rma.c [1068:1276]
ssize_t psmx2_writev_generic(struct fid_ep *ep, const struct iovec *iov,
void **desc, size_t count, fi_addr_t dest_addr,
uint64_t addr, uint64_t key, void *context,
uint64_t flags, uint64_t data)
{
struct psmx2_fid_ep *ep_priv;
struct psmx2_fid_av *av;
struct psmx2_am_request *req;
psm2_amarg_t args[8];
int nargs;
int am_flags = PSM2_AM_FLAG_ASYNC;
int chunk_size;
psm2_epaddr_t psm2_epaddr;
psm2_epid_t psm2_epid;
psm2_mq_req_t psm2_req;
psm2_mq_tag_t psm2_tag;
void *psm2_context;
int no_event;
size_t total_len, len, len_sent;
uint8_t *buf, *p;
int i;
ep_priv = container_of(ep, struct psmx2_fid_ep, ep);
if (flags & FI_TRIGGER)
return psmx2_trigger_queue_writev(ep, iov, desc, count,
dest_addr, addr, key,
context, flags, data);
av = ep_priv->av;
assert(av);
psm2_epaddr = psmx2_av_translate_addr(av, ep_priv->tx, dest_addr, av->type);
psm2_epaddr_to_epid(psm2_epaddr, &psm2_epid);
if (psm2_epid == ep_priv->tx->psm2_epid)
return psmx2_rma_self(PSMX2_AM_REQ_WRITEV, ep_priv,
(void *)iov, count, desc, addr,
key, context, flags, data);
no_event = (flags & PSMX2_NO_COMPLETION) ||
(ep_priv->send_selective_completion && !(flags & FI_COMPLETION));
total_len = 0;
for (i=0; i<count; i++)
total_len += iov[i].iov_len;
chunk_size = ep_priv->tx->psm2_am_param.max_request_short;
req = psmx2_am_request_alloc(ep_priv->tx);
if (!req)
return -FI_ENOMEM;
/* Case 1: fit into a AM message, then pack and send */
if (total_len <= chunk_size) {
req->tmpbuf = malloc(total_len);
if (!req->tmpbuf) {
psmx2_am_request_free(ep_priv->tx, req);
return -FI_ENOMEM;
}
p = req->tmpbuf;
for (i=0; i<count; i++) {
if (iov[i].iov_len) {
memcpy(p, iov[i].iov_base, iov[i].iov_len);
p += iov[i].iov_len;
}
}
buf = req->tmpbuf;
len = total_len;
req->no_event = no_event;
req->op = PSMX2_AM_REQ_WRITE;
req->write.buf = (void *)buf;
req->write.len = len;
req->write.addr = addr; /* needed? */
req->write.key = key; /* needed? */
req->write.context = context;
req->ep = ep_priv;
req->cq_flags = FI_WRITE | FI_RMA;
PSMX2_CTXT_USER(&req->fi_context) = context;
PSMX2_CTXT_EP(&req->fi_context) = ep_priv;
args[0].u32w0 = 0;
PSMX2_AM_SET_OP(args[0].u32w0, PSMX2_AM_REQ_WRITE);
args[0].u32w1 = len;
args[1].u64 = (uint64_t)(uintptr_t)req;
args[2].u64 = addr;
args[3].u64 = key;
nargs = 4;
if (flags & FI_REMOTE_CQ_DATA) {
PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_DATA | PSMX2_AM_EOM);
args[4].u64 = data;
nargs++;
} else {
PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_EOM);
}
psm2_am_request_short(psm2_epaddr, PSMX2_AM_RMA_HANDLER, args, nargs,
(void *)buf, len, am_flags, NULL, NULL);
psmx2_am_poll(ep_priv->tx);
return 0;
}
if (flags & FI_INJECT) {
psmx2_am_request_free(ep_priv->tx, req);
return -FI_EMSGSIZE;
}
PSMX2_CTXT_TYPE(&req->fi_context) = no_event ?
PSMX2_NOCOMP_WRITE_CONTEXT :
PSMX2_WRITE_CONTEXT;
req->no_event = no_event;
req->op = PSMX2_AM_REQ_WRITE;
req->write.buf = (void *)iov[0].iov_base;
req->write.len = total_len;
req->write.addr = addr; /* needed? */
req->write.key = key; /* needed? */
req->write.context = context;
req->ep = ep_priv;
req->cq_flags = FI_WRITE | FI_RMA;
PSMX2_CTXT_USER(&req->fi_context) = context;
PSMX2_CTXT_EP(&req->fi_context) = ep_priv;
/* Case 2: send iov in sequence */
args[0].u32w0 = 0;
len_sent = 0;
for (i=0; i<count; i++) {
if (!iov[i].iov_len)
continue;
/* Case 2.1: use long protocol for the last segment if it is large */
if (psmx2_env.tagged_rma && iov[i].iov_len > chunk_size &&
len_sent + iov[i].iov_len == total_len) {
PSMX2_SET_TAG(psm2_tag, (uint64_t)req, 0, PSMX2_RMA_TYPE_WRITE);
PSMX2_AM_SET_OP(args[0].u32w0, PSMX2_AM_REQ_WRITE_LONG);
args[0].u32w1 = iov[i].iov_len;
args[1].u64 = (uint64_t)req;
args[2].u64 = addr;
args[3].u64 = key;
nargs = 4;
if (flags & FI_REMOTE_CQ_DATA) {
PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_DATA);
args[4].u64 = data;
nargs++;
}
if (flags & FI_DELIVERY_COMPLETE) {
args[0].u32w0 |= PSMX2_AM_FORCE_ACK;
psm2_context = NULL;
} else {
psm2_context = (void *)&req->fi_context;
}
psm2_am_request_short(psm2_epaddr, PSMX2_AM_RMA_HANDLER, args,
nargs, NULL, 0, am_flags, NULL, NULL);
psmx2_am_poll(ep_priv->tx);
psm2_mq_isend2(ep_priv->tx->psm2_mq, psm2_epaddr, 0,
&psm2_tag, iov[i].iov_base, iov[i].iov_len,
psm2_context, &psm2_req);
return 0;
}
/* Case 2.2: use short protocol all other segments */
PSMX2_AM_SET_OP(args[0].u32w0, PSMX2_AM_REQ_WRITE);
nargs = 4;
buf = iov[i].iov_base;
len = iov[i].iov_len;
while (len > chunk_size) {
args[0].u32w1 = chunk_size;
args[1].u64 = (uint64_t)(uintptr_t)req;
args[2].u64 = addr;
args[3].u64 = key;
psm2_am_request_short(psm2_epaddr, PSMX2_AM_RMA_HANDLER, args,
nargs, (void *)buf, chunk_size, am_flags,
NULL, NULL);
psmx2_am_poll(ep_priv->tx);
buf += chunk_size;
addr += chunk_size;
len -= chunk_size;
len_sent += chunk_size;
}
args[0].u32w1 = len;
args[1].u64 = (uint64_t)(uintptr_t)req;
args[2].u64 = addr;
args[3].u64 = key;
if (len_sent + len == total_len) {
if (flags & FI_REMOTE_CQ_DATA) {
PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_DATA | PSMX2_AM_EOM);
args[4].u64 = data;
nargs++;
} else {
PSMX2_AM_SET_FLAG(args[0].u32w0, PSMX2_AM_EOM);
}
}
psm2_am_request_short(psm2_epaddr, PSMX2_AM_RMA_HANDLER, args, nargs,
(void *)buf, len, am_flags, NULL, NULL);
psmx2_am_poll(ep_priv->tx);
addr += len;
len_sent += len;
}
return 0;
}