in prov/psm2/src/psmx2_tagged.c [815:986]
ssize_t psmx2_tagged_sendv_generic(struct fid_ep *ep,
const struct iovec *iov, void **desc,
size_t count, fi_addr_t dest_addr,
uint64_t tag, void *context,
uint64_t flags, uint64_t data)
{
struct psmx2_fid_ep *ep_priv;
struct psmx2_fid_av *av;
psm2_epaddr_t psm2_epaddr;
psm2_mq_req_t psm2_req;
psm2_mq_tag_t psm2_tag;
struct fi_context * fi_context;
int send_flag = 0;
int err;
int no_completion = 0;
struct psmx2_cq_event *event;
size_t real_count;
size_t len, total_len;
char *p;
uint32_t *q;
int i, j;
struct psmx2_sendv_request *req;
int have_data = (flags & FI_REMOTE_CQ_DATA) > 0;
uint32_t msg_flags;
assert((tag & ~PSMX2_TAG_MASK) == 0);
ep_priv = container_of(ep, struct psmx2_fid_ep, ep);
if (flags & FI_TRIGGER)
return psmx2_trigger_queue_tsendv(ep, iov, desc, count,
dest_addr, tag, context,
flags, data);
total_len = 0;
real_count = 0;
for (i=0; i<count; i++) {
if (iov[i].iov_len) {
total_len += iov[i].iov_len;
real_count++;
j = i;
}
}
if (real_count == 1)
return psmx2_tagged_send_generic(ep, iov[j].iov_base, iov[j].iov_len,
desc ? desc[j] : NULL, dest_addr,
tag, context, flags, data);
req = malloc(sizeof(*req));
if (!req)
return -FI_ENOMEM;
if (total_len <= PSMX2_IOV_BUF_SIZE) {
req->iov_protocol = PSMX2_IOV_PROTO_PACK;
p = req->buf;
for (i=0; i<count; i++) {
if (iov[i].iov_len) {
memcpy(p, iov[i].iov_base, iov[i].iov_len);
p += iov[i].iov_len;
}
}
msg_flags = PSMX2_TYPE_TAGGED;
len = total_len;
} else {
req->iov_protocol = PSMX2_IOV_PROTO_MULTI;
req->iov_done = 0;
req->iov_info.seq_num = (++ep_priv->iov_seq_num) %
PSMX2_IOV_MAX_SEQ_NUM + 1;
req->iov_info.count = (uint32_t)real_count;
req->iov_info.total_len = (uint32_t)total_len;
q = req->iov_info.len;
for (i=0; i<count; i++) {
if (iov[i].iov_len)
*q++ = (uint32_t)iov[i].iov_len;
}
msg_flags = PSMX2_TYPE_TAGGED | PSMX2_IOV_BIT;
len = (3 + real_count) * sizeof(uint32_t);
}
av = ep_priv->av;
assert(av);
psm2_epaddr = psmx2_av_translate_addr(av, ep_priv->tx, dest_addr, av->type);
if (have_data)
PSMX2_SET_TAG(psm2_tag, tag, (uint32_t)data, msg_flags | PSMX2_IMM_BIT);
else
PSMX2_SET_TAG(psm2_tag, tag, (uint32_t)ep_priv->sep_id, msg_flags);
if ((flags & PSMX2_NO_COMPLETION) ||
(ep_priv->send_selective_completion && !(flags & FI_COMPLETION)))
no_completion = 1;
if (flags & FI_INJECT) {
if (len > psmx2_env.inject_size) {
free(req);
return -FI_EMSGSIZE;
}
err = psm2_mq_send2(ep_priv->tx->psm2_mq, psm2_epaddr,
send_flag, &psm2_tag, req->buf, len);
free(req);
if (err != PSM2_OK)
return psmx2_errno(err);
if (ep_priv->send_cntr)
psmx2_cntr_inc(ep_priv->send_cntr, 0);
if (ep_priv->send_cq && !no_completion) {
event = psmx2_cq_create_event(
ep_priv->send_cq,
context, NULL, flags, len,
(uint64_t) data,
0 /* tag */,
0 /* olen */,
0 /* err */);
if (event)
psmx2_cq_enqueue_event(ep_priv->send_cq, event);
else
return -FI_ENOMEM;
}
return 0;
}
req->no_completion = no_completion;
req->user_context = context;
req->comp_flag = FI_TAGGED;
fi_context = &req->fi_context;
PSMX2_CTXT_TYPE(fi_context) = PSMX2_SENDV_CONTEXT;
PSMX2_CTXT_USER(fi_context) = req;
PSMX2_CTXT_EP(fi_context) = ep_priv;
err = psm2_mq_isend2(ep_priv->tx->psm2_mq, psm2_epaddr,
send_flag, &psm2_tag, req->buf, len,
(void *)fi_context, &psm2_req);
if (err != PSM2_OK) {
free(req);
return psmx2_errno(err);
}
PSMX2_CTXT_REQ(fi_context) = psm2_req;
if (req->iov_protocol == PSMX2_IOV_PROTO_MULTI) {
fi_context = &req->fi_context_iov;
PSMX2_CTXT_TYPE(fi_context) = PSMX2_IOV_SEND_CONTEXT;
PSMX2_CTXT_USER(fi_context) = req;
PSMX2_CTXT_EP(fi_context) = ep_priv;
PSMX2_SET_TAG(psm2_tag, req->iov_info.seq_num, 0,
PSMX2_TYPE_IOV_PAYLOAD);
for (i=0; i<count; i++) {
if (iov[i].iov_len) {
err = psm2_mq_isend2(ep_priv->tx->psm2_mq,
psm2_epaddr, send_flag, &psm2_tag,
iov[i].iov_base, iov[i].iov_len,
(void *)fi_context, &psm2_req);
if (err != PSM2_OK)
return psmx2_errno(err);
}
}
}
return 0;
}