in prov/psm2/src/psmx2_cq.c [1049:1559]
int psmx2_cq_poll_mq(struct psmx2_fid_cq *cq,
struct psmx2_trx_ctxt *trx_ctxt,
struct psmx2_cq_event *event_in,
int count, fi_addr_t *src_addr)
{
psm2_mq_status2_t status_priv, *status = &status_priv;
struct fi_context *fi_context;
struct psmx2_fid_ep *ep;
struct psmx2_fid_mr *mr;
struct psmx2_am_request *am_req;
struct psmx2_multi_recv *multi_recv_req;
struct psmx2_sendv_request *sendv_req;
struct psmx2_sendv_reply *sendv_rep;
psm2_mq_req_t psm2_req;
size_t len_remaining;
void *op_context;
void *buf;
uint64_t flags;
uint64_t data;
int read_count = 0;
int read_more = 1;
int err;
int context_type;
while (read_more) {
/*
* psm2_mq_test2 is called immediately after psm2_mq_ipeek with a lock held to
* prevent psm2_mq_ipeek from returning the same request multiple times under
* different threads.
*/
if (trx_ctxt->domain->poll_trylock_fn(&trx_ctxt->poll_lock, 2)) {
err = PSM2_MQ_NO_COMPLETIONS;
} else {
err = psm2_mq_ipeek(trx_ctxt->psm2_mq, &psm2_req, NULL);
if (err == PSM2_OK)
psm2_mq_test2(&psm2_req, status);
trx_ctxt->domain->poll_unlock_fn(&trx_ctxt->poll_lock, 2);
}
if (err == PSM2_OK) {
fi_context = PSMX2_STATUS_CONTEXT(status);
if (OFI_UNLIKELY(!fi_context)) {
continue;
}
ep = PSMX2_CTXT_EP(fi_context);
context_type = (int)PSMX2_CTXT_TYPE(fi_context);
switch (context_type) {
case PSMX2_SEND_CONTEXT:
case PSMX2_TSEND_CONTEXT:
if (ep->send_cq) {
op_context = fi_context;
buf = PSMX2_CTXT_USER(fi_context);
flags = psmx2_comp_flags[context_type];
err = psmx2_cq_tx_complete(
cq, ep->send_cq, ep->av,
status, op_context, buf, flags, 0,
event_in, count, &read_count,
&read_more);
if (err)
return err;
}
if (ep->send_cntr)
psmx2_cntr_inc(ep->send_cntr, PSMX2_STATUS_ERROR(status));
/* Bi-directional send/recv performance tweak for KNL */
if (PSMX2_STATUS_SNDLEN(status) > 16384)
read_more = 0;
break;
case PSMX2_NOCOMP_SEND_CONTEXT:
case PSMX2_NOCOMP_TSEND_CONTEXT:
if (ep->send_cq && PSMX2_STATUS_ERROR(status)) {
op_context = NULL;
buf = NULL;
flags = psmx2_comp_flags[context_type];
err = psmx2_cq_tx_complete(
cq, ep->send_cq, ep->av,
status, op_context, buf, flags, 0,
event_in, count, &read_count,
&read_more);
if (err)
return err;
}
if (ep->send_cntr)
psmx2_cntr_inc(ep->send_cntr, PSMX2_STATUS_ERROR(status));
break;
case PSMX2_RECV_CONTEXT:
if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
!psmx2_handle_sendv_req(ep, status, 0)))
continue;
if (ep->recv_cq) {
op_context = fi_context;
buf = PSMX2_CTXT_USER(fi_context);
flags = psmx2_comp_flags[context_type];
data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
flags |= FI_REMOTE_CQ_DATA;
err = psmx2_cq_rx_complete(
cq, ep->recv_cq, ep->av,
status, op_context, buf, flags, data,
event_in, count, &read_count,
&read_more, src_addr);
if (err)
return err;
}
if (ep->recv_cntr)
psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
break;
case PSMX2_TRECV_CONTEXT:
if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
!psmx2_handle_sendv_req(ep, status, 0)))
continue;
if (ep->recv_cq) {
op_context = fi_context;
buf = PSMX2_CTXT_USER(fi_context);
flags = psmx2_comp_flags[context_type];
data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
flags |= FI_REMOTE_CQ_DATA;
err = psmx2_cq_rx_complete(
cq, ep->recv_cq, ep->av,
status, op_context, buf, flags, data,
event_in, count, &read_count,
&read_more, src_addr);
if (err)
return err;
}
if (ep->recv_cntr)
psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
break;
case PSMX2_NOCOMP_RECV_CONTEXT:
if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
!psmx2_handle_sendv_req(ep, status, 0))) {
PSMX2_EP_PUT_OP_CONTEXT(ep, fi_context);
continue;
}
PSMX2_EP_PUT_OP_CONTEXT(ep, fi_context);
if (OFI_UNLIKELY(ep->recv_cq && PSMX2_STATUS_ERROR(status))) {
op_context = NULL;
buf = NULL;
flags = psmx2_comp_flags[context_type];
data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
flags |= FI_REMOTE_CQ_DATA;
err = psmx2_cq_rx_complete(
cq, ep->recv_cq, ep->av,
status, op_context, buf, flags, data,
event_in, count, &read_count,
&read_more, src_addr);
if (err)
return err;
}
if (ep->recv_cntr)
psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
break;
case PSMX2_NOCOMP_TRECV_CONTEXT:
if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
!psmx2_handle_sendv_req(ep, status, 0))) {
PSMX2_EP_PUT_OP_CONTEXT(ep, fi_context);
continue;
}
PSMX2_EP_PUT_OP_CONTEXT(ep, fi_context);
if (OFI_UNLIKELY(ep->recv_cq && PSMX2_STATUS_ERROR(status))) {
op_context = NULL;
buf = NULL;
flags = psmx2_comp_flags[context_type];
data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
flags |= FI_REMOTE_CQ_DATA;
err = psmx2_cq_rx_complete(
cq, ep->recv_cq, ep->av,
status, op_context, buf, flags, data,
event_in, count, &read_count,
&read_more, src_addr);
if (err)
return err;
}
if (ep->recv_cntr)
psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
break;
case PSMX2_WRITE_CONTEXT:
am_req = container_of(fi_context, struct psmx2_am_request,
fi_context);
op_context = PSMX2_CTXT_USER(fi_context);
free(am_req->tmpbuf);
psmx2_am_request_free(trx_ctxt, am_req);
if (ep->send_cq) {
buf = NULL;
flags = psmx2_comp_flags[context_type];
err = psmx2_cq_tx_complete(
cq, ep->send_cq, ep->av,
status, op_context, buf, flags, 0,
event_in, count, &read_count,
&read_more);
if (err)
return err;
}
if (ep->write_cntr)
psmx2_cntr_inc(ep->write_cntr, PSMX2_STATUS_ERROR(status));
break;
case PSMX2_NOCOMP_WRITE_CONTEXT:
am_req = container_of(fi_context, struct psmx2_am_request,
fi_context);
op_context = PSMX2_CTXT_USER(fi_context);
free(am_req->tmpbuf);
psmx2_am_request_free(trx_ctxt, am_req);
if (OFI_UNLIKELY(ep->send_cq && PSMX2_STATUS_ERROR(status))) {
buf = NULL;
flags = psmx2_comp_flags[context_type];
err = psmx2_cq_tx_complete(
cq, ep->send_cq, ep->av,
status, op_context, buf, flags, 0,
event_in, count, &read_count,
&read_more);
if (err)
return err;
}
if (ep->write_cntr)
psmx2_cntr_inc(ep->write_cntr, PSMX2_STATUS_ERROR(status));
break;
case PSMX2_READ_CONTEXT:
am_req = container_of(fi_context, struct psmx2_am_request,
fi_context);
if (OFI_UNLIKELY(am_req->op == PSMX2_AM_REQ_READV)) {
am_req->read.len_read += PSMX2_STATUS_RCVLEN(status);
if (am_req->read.len_read < am_req->read.len) {
FI_INFO(&psmx2_prov, FI_LOG_EP_DATA,
"readv: long protocol finishes early\n");
if (PSMX2_STATUS_ERROR(status))
am_req->error = psmx2_errno(PSMX2_STATUS_ERROR(status));
/* Request to be freed in AM handler */
continue;
}
}
op_context = PSMX2_CTXT_USER(fi_context);
free(am_req->tmpbuf);
psmx2_am_request_free(trx_ctxt, am_req);
if (ep->send_cq) {
buf = NULL;
flags = psmx2_comp_flags[context_type];
err = psmx2_cq_tx_complete(
cq, ep->send_cq, ep->av,
status, op_context, buf, flags, 0,
event_in, count, &read_count,
&read_more);
if (err)
return err;
}
if (ep->read_cntr)
psmx2_cntr_inc(ep->read_cntr, PSMX2_STATUS_ERROR(status));
break;
case PSMX2_NOCOMP_READ_CONTEXT:
am_req = container_of(fi_context, struct psmx2_am_request,
fi_context);
if (OFI_UNLIKELY(am_req->op == PSMX2_AM_REQ_READV)) {
am_req->read.len_read += PSMX2_STATUS_RCVLEN(status);
if (am_req->read.len_read < am_req->read.len) {
FI_INFO(&psmx2_prov, FI_LOG_EP_DATA,
"readv: long protocol finishes early\n");
if (PSMX2_STATUS_ERROR(status))
am_req->error = psmx2_errno(PSMX2_STATUS_ERROR(status));
/* Request to be freed in AM handler */
continue;
}
}
op_context = PSMX2_CTXT_USER(fi_context);
free(am_req->tmpbuf);
psmx2_am_request_free(trx_ctxt, am_req);
if (OFI_UNLIKELY(ep->send_cq && PSMX2_STATUS_ERROR(status))) {
buf = NULL;
flags = psmx2_comp_flags[context_type];
err = psmx2_cq_tx_complete(
cq, ep->send_cq, ep->av,
status, op_context, buf, flags, 0,
event_in, count, &read_count,
&read_more);
if (err)
return err;
}
if (ep->read_cntr)
psmx2_cntr_inc(ep->read_cntr, PSMX2_STATUS_ERROR(status));
break;
case PSMX2_MULTI_RECV_CONTEXT:
if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
!psmx2_handle_sendv_req(ep, status, 1)))
continue;
multi_recv_req = PSMX2_CTXT_USER(fi_context);
if (ep->recv_cq) {
op_context = fi_context;
buf = multi_recv_req->buf + multi_recv_req->offset;
flags = psmx2_comp_flags[context_type];
data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
flags |= FI_REMOTE_CQ_DATA;
if (multi_recv_req->offset + PSMX2_STATUS_RCVLEN(status) +
multi_recv_req->min_buf_size > multi_recv_req->len)
flags |= FI_MULTI_RECV; /* buffer used up */
err = psmx2_cq_rx_complete(
cq, ep->recv_cq, ep->av,
status, op_context, buf, flags, data,
event_in, count, &read_count,
&read_more, src_addr);
if (err)
return err;
}
if (ep->recv_cntr)
psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
/* repost multi-recv buffer */
multi_recv_req->offset += PSMX2_STATUS_RCVLEN(status);
len_remaining = multi_recv_req->len - multi_recv_req->offset;
if (len_remaining >= multi_recv_req->min_buf_size) {
if (len_remaining > PSMX2_MAX_MSG_SIZE)
len_remaining = PSMX2_MAX_MSG_SIZE;
err = psm2_mq_irecv2(ep->rx->psm2_mq,
multi_recv_req->src_addr, &multi_recv_req->tag,
&multi_recv_req->tagsel, multi_recv_req->flag,
multi_recv_req->buf + multi_recv_req->offset,
len_remaining,
(void *)fi_context, &psm2_req);
if (err != PSM2_OK)
return psmx2_errno(err);
PSMX2_CTXT_REQ(fi_context) = psm2_req;
} else {
free(multi_recv_req);
}
break;
case PSMX2_REMOTE_WRITE_CONTEXT:
am_req = container_of(fi_context, struct psmx2_am_request, fi_context);
if (am_req->op & PSMX2_AM_FORCE_ACK) {
am_req->error = psmx2_errno(PSMX2_STATUS_ERROR(status));
psmx2_am_ack_rma(am_req);
}
if (am_req->ep->recv_cq && (am_req->cq_flags & FI_REMOTE_CQ_DATA)) {
op_context = NULL;
buf = NULL;
flags = psmx2_comp_flags[context_type] | FI_REMOTE_CQ_DATA;
err = psmx2_cq_rx_complete(
cq, am_req->ep->recv_cq, am_req->ep->av,
status, op_context, buf, flags, am_req->write.data,
event_in, count, &read_count,
&read_more, src_addr);
if (err) {
psmx2_am_request_free(trx_ctxt, am_req);
return err;
}
}
if (am_req->ep->caps & FI_RMA_EVENT) {
if (am_req->ep->remote_write_cntr)
psmx2_cntr_inc(am_req->ep->remote_write_cntr, 0);
mr = PSMX2_CTXT_USER(fi_context);
if (mr->cntr && mr->cntr != am_req->ep->remote_write_cntr)
psmx2_cntr_inc(mr->cntr, 0);
}
/* NOTE: am_req->tmpbuf is unused here */
psmx2_am_request_free(trx_ctxt, am_req);
break;
case PSMX2_REMOTE_READ_CONTEXT:
am_req = container_of(fi_context, struct psmx2_am_request, fi_context);
if (am_req->ep->caps & FI_RMA_EVENT) {
if (am_req->ep->remote_read_cntr)
psmx2_cntr_inc(am_req->ep->remote_read_cntr, 0);
}
/* NOTE: am_req->tmpbuf is unused here */
psmx2_am_request_free(trx_ctxt, am_req);
break;
case PSMX2_SENDV_CONTEXT:
sendv_req = PSMX2_CTXT_USER(fi_context);
sendv_req->iov_done++;
if (sendv_req->iov_protocol == PSMX2_IOV_PROTO_MULTI &&
sendv_req->iov_done < sendv_req->iov_info.count + 1) {
sendv_req->tag = PSMX2_STATUS_TAG(status);
continue;
}
if (ep->send_cq && !sendv_req->no_completion) {
op_context = sendv_req->user_context;
buf = NULL;
flags = psmx2_comp_flags[context_type] |
sendv_req->comp_flag;
err = psmx2_cq_tx_complete(
cq, ep->send_cq, ep->av,
status, op_context, buf, flags, 0,
event_in, count, &read_count,
&read_more);
if (err) {
free(sendv_req);
return err;
}
}
if (ep->send_cntr)
psmx2_cntr_inc(ep->send_cntr, PSMX2_STATUS_ERROR(status));
free(sendv_req);
break;
case PSMX2_IOV_SEND_CONTEXT:
sendv_req = PSMX2_CTXT_USER(fi_context);
sendv_req->iov_done++;
if (sendv_req->iov_done < sendv_req->iov_info.count + 1)
continue;
PSMX2_STATUS_TAG(status) = sendv_req->tag;
if (ep->send_cq && !sendv_req->no_completion) {
op_context = sendv_req->user_context;
buf = NULL;
flags = psmx2_comp_flags[context_type] |
sendv_req->comp_flag;
err = psmx2_cq_tx_complete(
cq, ep->send_cq, ep->av,
status, op_context, buf, flags, 0,
event_in, count, &read_count,
&read_more);
if (err) {
free(sendv_req);
return err;
}
}
if (ep->send_cntr)
psmx2_cntr_inc(ep->send_cntr, PSMX2_STATUS_ERROR(status));
free(sendv_req);
break;
case PSMX2_IOV_RECV_CONTEXT:
sendv_rep = PSMX2_CTXT_USER(fi_context);
sendv_rep->iov_done++;
sendv_rep->msg_length += PSMX2_STATUS_SNDLEN(status);
sendv_rep->bytes_received += PSMX2_STATUS_RCVLEN(status);
if (PSMX2_STATUS_ERROR(status) != PSM2_OK)
sendv_rep->error_code = PSMX2_STATUS_ERROR(status);
if (sendv_rep->iov_done < sendv_rep->iov_info.count)
continue;
PSMX2_STATUS_TAG(status) = sendv_rep->tag;
PSMX2_STATUS_RCVLEN(status) = sendv_rep->bytes_received;
PSMX2_STATUS_SNDLEN(status) = sendv_rep->msg_length;
PSMX2_STATUS_ERROR(status) = sendv_rep->error_code;
if (ep->recv_cq && !sendv_rep->no_completion) {
op_context = sendv_rep->user_context;
buf = sendv_rep->buf;
flags = psmx2_comp_flags[context_type] |
sendv_rep->comp_flag;
err = psmx2_cq_rx_complete(
cq, ep->recv_cq, ep->av,
status, op_context, buf, flags, 0,
event_in, count, &read_count,
&read_more, src_addr);
if (err) {
free(sendv_rep);
return err;
}
}
if (ep->recv_cntr)
psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
if (sendv_rep->multi_recv) {
/* repost the multi-recv buffer */
fi_context = sendv_rep->user_context;
multi_recv_req = PSMX2_CTXT_USER(fi_context);
multi_recv_req->offset += PSMX2_STATUS_RCVLEN(status);
len_remaining = multi_recv_req->len - multi_recv_req->offset;
if (len_remaining >= multi_recv_req->min_buf_size) {
if (len_remaining > PSMX2_MAX_MSG_SIZE)
len_remaining = PSMX2_MAX_MSG_SIZE;
err = psm2_mq_irecv2(ep->rx->psm2_mq,
multi_recv_req->src_addr, &multi_recv_req->tag,
&multi_recv_req->tagsel, multi_recv_req->flag,
multi_recv_req->buf + multi_recv_req->offset,
len_remaining,
(void *)fi_context, &psm2_req);
if (err != PSM2_OK) {
free(sendv_rep);
return psmx2_errno(err);
}
PSMX2_CTXT_REQ(fi_context) = psm2_req;
} else {
free(multi_recv_req);
}
}
free(sendv_rep);
break;
}
} else if (err == PSM2_MQ_NO_COMPLETIONS) {
return read_count;
} else {
return psmx2_errno(err);
}
}
return read_count;
}