in prov/psm/src/psmx_atomic.c [596:736]
static int psmx_atomic_self(int am_cmd,
struct psmx_fid_ep *ep,
const void *buf,
size_t count, void *desc,
const void *compare, void *compare_desc,
void *result, void *result_desc,
uint64_t addr, uint64_t key,
enum fi_datatype datatype,
enum fi_op op, void *context,
uint64_t flags)
{
struct psmx_fid_mr *mr;
struct psmx_cq_event *event;
struct psmx_fid_ep *target_ep;
struct psmx_fid_cntr *cntr = NULL;
struct psmx_fid_cntr *mr_cntr = NULL;
void *tmp_buf;
size_t len;
int no_event;
int err = 0;
int op_error;
int access;
uint64_t cq_flags = 0;
if (am_cmd == PSMX_AM_REQ_ATOMIC_WRITE)
access = FI_REMOTE_WRITE;
else
access = FI_REMOTE_READ | FI_REMOTE_WRITE;
len = ofi_datatype_size(datatype) * count;
mr = psmx_mr_get(psmx_active_fabric->active_domain, key);
op_error = mr ? psmx_mr_validate(mr, addr, len, access) : -FI_EINVAL;
if (op_error)
goto gen_local_event;
addr += mr->offset;
switch (am_cmd) {
case PSMX_AM_REQ_ATOMIC_WRITE:
err = psmx_atomic_do_write((void *)addr, (void *)buf,
(int)datatype, (int)op, (int)count);
cq_flags = FI_WRITE | FI_ATOMIC;
break;
case PSMX_AM_REQ_ATOMIC_READWRITE:
if (result != buf) {
err = psmx_atomic_do_readwrite((void *)addr, (void *)buf,
(void *)result, (int)datatype,
(int)op, (int)count);
} else {
tmp_buf = malloc(len);
if (tmp_buf) {
memcpy(tmp_buf, result, len);
err = psmx_atomic_do_readwrite((void *)addr, (void *)buf,
tmp_buf, (int)datatype,
(int)op, (int)count);
memcpy(result, tmp_buf, len);
free(tmp_buf);
} else {
err = -FI_ENOMEM;
}
}
if (op == FI_ATOMIC_READ)
cq_flags = FI_READ | FI_ATOMIC;
else
cq_flags = FI_WRITE | FI_ATOMIC;
break;
case PSMX_AM_REQ_ATOMIC_COMPWRITE:
if (result != buf && result != compare) {
err = psmx_atomic_do_compwrite((void *)addr, (void *)buf,
(void *)compare, (void *)result,
(int)datatype, (int)op, (int)count);
} else {
tmp_buf = malloc(len);
if (tmp_buf) {
memcpy(tmp_buf, result, len);
err = psmx_atomic_do_compwrite((void *)addr, (void *)buf,
(void *)compare, tmp_buf,
(int)datatype, (int)op, (int)count);
memcpy(result, tmp_buf, len);
free(tmp_buf);
} else {
err = -FI_ENOMEM;
}
}
cq_flags = FI_WRITE | FI_ATOMIC;
break;
}
target_ep = mr->domain->atomics_ep;
if (target_ep->caps & FI_RMA_EVENT) {
if (op == FI_ATOMIC_READ) {
cntr = target_ep->remote_read_cntr;
} else {
cntr = target_ep->remote_write_cntr;
mr_cntr = mr->cntr;
}
if (cntr)
psmx_cntr_inc(cntr);
if (mr_cntr && mr_cntr != cntr)
psmx_cntr_inc(mr_cntr);
}
gen_local_event:
no_event = ((flags & PSMX_NO_COMPLETION) ||
(ep->send_selective_completion && !(flags & FI_COMPLETION)));
if (ep->send_cq && (!no_event || op_error)) {
event = psmx_cq_create_event(
ep->send_cq,
context,
(void *)buf,
cq_flags,
len,
0, /* data */
0, /* tag */
0, /* olen */
op_error);
if (event)
psmx_cq_enqueue_event(ep->send_cq, event);
else
err = -FI_ENOMEM;
}
switch (am_cmd) {
case PSMX_AM_REQ_ATOMIC_WRITE:
if (ep->write_cntr)
psmx_cntr_inc(ep->write_cntr);
break;
case PSMX_AM_REQ_ATOMIC_READWRITE:
case PSMX_AM_REQ_ATOMIC_COMPWRITE:
if (ep->read_cntr)
psmx_cntr_inc(ep->read_cntr);
break;
}
return err;
}