in prov/psm/src/psmx_rma.c [640:819]
ssize_t _psmx_write(struct fid_ep *ep, const void *buf, size_t len,
void *desc, fi_addr_t dest_addr,
uint64_t addr, uint64_t key, void *context,
uint64_t flags, uint64_t data)
{
struct psmx_fid_ep *ep_priv;
struct psmx_fid_av *av;
struct psmx_epaddr_context *epaddr_context;
struct psmx_am_request *req;
psm_amarg_t args[8];
int nargs;
int am_flags = PSM_AM_FLAG_ASYNC;
int chunk_size;
psm_mq_req_t psm_req;
uint64_t psm_tag;
size_t idx;
void *psm_context;
int no_event;
ep_priv = container_of(ep, struct psmx_fid_ep, ep);
if (flags & FI_TRIGGER) {
struct psmx_trigger *trigger;
struct fi_triggered_context *ctxt = context;
trigger = calloc(1, sizeof(*trigger));
if (!trigger)
return -FI_ENOMEM;
trigger->op = PSMX_TRIGGERED_WRITE;
trigger->cntr = container_of(ctxt->trigger.threshold.cntr,
struct psmx_fid_cntr, cntr);
trigger->threshold = ctxt->trigger.threshold.threshold;
trigger->write.ep = ep;
trigger->write.buf = buf;
trigger->write.len = len;
trigger->write.desc = desc;
trigger->write.dest_addr = dest_addr;
trigger->write.addr = addr;
trigger->write.key = key;
trigger->write.context = context;
trigger->write.flags = flags & ~FI_TRIGGER;
trigger->write.data = data;
psmx_cntr_add_trigger(trigger->cntr, trigger);
return 0;
}
if (!buf)
return -FI_EINVAL;
av = ep_priv->av;
if (av && av->type == FI_AV_TABLE) {
idx = dest_addr;
if (idx >= av->last)
return -FI_EINVAL;
dest_addr = (fi_addr_t) av->psm_epaddrs[idx];
} else if (!dest_addr) {
return -FI_EINVAL;
}
epaddr_context = psm_epaddr_getctxt((void *)dest_addr);
if (epaddr_context->epid == ep_priv->domain->psm_epid)
return psmx_rma_self(PSMX_AM_REQ_WRITE,
ep_priv, (void *)buf, len, desc,
addr, key, context, flags, data);
no_event = (flags & PSMX_NO_COMPLETION) ||
(ep_priv->send_selective_completion && !(flags & FI_COMPLETION));
if (flags & FI_INJECT) {
if (len > PSMX_INJECT_SIZE)
return -FI_EMSGSIZE;
req = malloc(sizeof(*req) + len);
if (!req)
return -FI_ENOMEM;
memset(req, 0, sizeof(*req));
memcpy((uint8_t *)req + sizeof(*req), (void *)buf, len);
buf = (uint8_t *)req + sizeof(*req);
} else {
req = calloc(1, sizeof(*req));
if (!req)
return -FI_ENOMEM;
PSMX_CTXT_TYPE(&req->fi_context) = no_event ?
PSMX_NOCOMP_WRITE_CONTEXT :
PSMX_WRITE_CONTEXT;
}
req->no_event = no_event;
req->op = PSMX_AM_REQ_WRITE;
req->write.buf = (void *)buf;
req->write.len = len;
req->write.addr = addr; /* needed? */
req->write.key = key; /* needed? */
req->write.context = context;
req->ep = ep_priv;
req->cq_flags = FI_WRITE | FI_RMA;
PSMX_CTXT_USER(&req->fi_context) = context;
PSMX_CTXT_EP(&req->fi_context) = ep_priv;
chunk_size = MIN(PSMX_AM_CHUNK_SIZE, psmx_am_param.max_request_short);
if (psmx_env.tagged_rma && len > chunk_size) {
void *payload = NULL;
int payload_len = 0;
psm_tag = PSMX_RMA_BIT | ep_priv->domain->psm_epid;
args[0].u32w0 = PSMX_AM_REQ_WRITE_LONG;
args[0].u32w1 = len;
args[1].u64 = (uint64_t)req;
args[2].u64 = addr;
args[3].u64 = key;
args[4].u64 = psm_tag;
nargs = 5;
if (flags & FI_REMOTE_CQ_DATA) {
args[0].u32w0 |= PSMX_AM_DATA;
payload = &data;
payload_len = sizeof(data);
am_flags = 0;
}
if (flags & FI_DELIVERY_COMPLETE) {
args[0].u32w0 |= PSMX_AM_FORCE_ACK;
psm_context = NULL;
} else {
psm_context = (void *)&req->fi_context;
}
/* NOTE: if nargs is greater than 5, the following psm_mq_isend
* would hang if the destination is on the same node (i.e. going
* through the shared memory path). As the result, the immediate
* data is sent as payload instead of args[5].
*/
psm_am_request_short((psm_epaddr_t) dest_addr,
PSMX_AM_RMA_HANDLER, args, nargs,
payload, payload_len, am_flags,
NULL, NULL);
psm_mq_isend(ep_priv->domain->psm_mq, (psm_epaddr_t) dest_addr,
0, psm_tag, buf, len, psm_context, &psm_req);
return 0;
}
nargs = 4;
while (len > chunk_size) {
args[0].u32w0 = PSMX_AM_REQ_WRITE;
args[0].u32w1 = chunk_size;
args[1].u64 = (uint64_t)(uintptr_t)req;
args[2].u64 = addr;
args[3].u64 = key;
psm_am_request_short((psm_epaddr_t) dest_addr,
PSMX_AM_RMA_HANDLER, args, nargs,
(void *)buf, chunk_size,
am_flags, NULL, NULL);
buf = (const uint8_t *)buf + chunk_size;
addr += chunk_size;
len -= chunk_size;
}
args[0].u32w0 = PSMX_AM_REQ_WRITE | PSMX_AM_EOM;
args[0].u32w1 = len;
args[1].u64 = (uint64_t)(uintptr_t)req;
args[2].u64 = addr;
args[3].u64 = key;
if (flags & FI_REMOTE_CQ_DATA) {
args[4].u64 = data;
args[0].u32w0 |= PSMX_AM_DATA;
nargs++;
}
psm_am_request_short((psm_epaddr_t) dest_addr,
PSMX_AM_RMA_HANDLER, args, nargs,
(void *)buf, len, am_flags, NULL, NULL);
return 0;
}