in prov/psm/src/psmx_cq.c [348:569]
int psmx_cq_poll_mq(struct psmx_fid_cq *cq, struct psmx_fid_domain *domain,
struct psmx_cq_event *event_in, int count, fi_addr_t *src_addr)
{
psm_mq_req_t psm_req;
psm_mq_status_t psm_status;
struct fi_context *fi_context;
struct psmx_fid_ep *tmp_ep;
struct psmx_fid_cq *tmp_cq;
struct psmx_fid_cntr *tmp_cntr;
struct psmx_cq_event *event;
int multi_recv;
int err;
int read_more = 1;
int read_count = 0;
void *event_buffer = count ? event_in : NULL;
while (1) {
/* psm_mq_ipeek and psm_mq_test is suposed to be called
* in sequence. If the same sequence from different threads
* are interleaved the behavior is errorous: the second
* psm_mq_test could derefernce a request that has been
* freed because the two psm_mq_ipeek calls may return the
* same request. Use a lock to ensure that won't happen.
*/
if (fastlock_tryacquire(&domain->poll_lock))
return read_count;
err = psm_mq_ipeek(domain->psm_mq, &psm_req, NULL);
if (err == PSM_OK) {
err = psm_mq_test(&psm_req, &psm_status);
fastlock_release(&domain->poll_lock);
fi_context = psm_status.context;
if (!fi_context)
continue;
tmp_ep = PSMX_CTXT_EP(fi_context);
tmp_cq = NULL;
tmp_cntr = NULL;
multi_recv = 0;
switch ((int)PSMX_CTXT_TYPE(fi_context)) {
case PSMX_SEND_CONTEXT:
case PSMX_TSEND_CONTEXT:
tmp_cq = tmp_ep->send_cq;
tmp_cntr = tmp_ep->send_cntr;
break;
case PSMX_NOCOMP_SEND_CONTEXT:
if (psm_status.error_code)
tmp_cq = tmp_ep->send_cq;
tmp_cntr = tmp_ep->send_cntr;
break;
case PSMX_MULTI_RECV_CONTEXT:
multi_recv = 1;
/* Fall through */
case PSMX_RECV_CONTEXT:
case PSMX_TRECV_CONTEXT:
tmp_cq = tmp_ep->recv_cq;
tmp_cntr = tmp_ep->recv_cntr;
break;
case PSMX_NOCOMP_RECV_CONTEXT:
if (psm_status.error_code)
tmp_cq = tmp_ep->recv_cq;
tmp_cntr = tmp_ep->recv_cntr;
break;
case PSMX_WRITE_CONTEXT:
tmp_cq = tmp_ep->send_cq;
tmp_cntr = tmp_ep->write_cntr;
break;
case PSMX_NOCOMP_WRITE_CONTEXT:
if (psm_status.error_code)
tmp_cq = tmp_ep->send_cq;
tmp_cntr = tmp_ep->write_cntr;
break;
case PSMX_READ_CONTEXT:
tmp_cq = tmp_ep->send_cq;
tmp_cntr = tmp_ep->read_cntr;
break;
case PSMX_NOCOMP_READ_CONTEXT:
if (psm_status.error_code)
tmp_cq = tmp_ep->send_cq;
tmp_cntr = tmp_ep->read_cntr;
break;
case PSMX_REMOTE_WRITE_CONTEXT:
{
struct fi_context *fi_context = psm_status.context;
struct psmx_fid_mr *mr;
struct psmx_am_request *req;
req = container_of(fi_context, struct psmx_am_request, fi_context);
if (req->op & PSMX_AM_FORCE_ACK) {
req->error = psmx_errno(psm_status.error_code);
psmx_am_ack_rma(req);
}
mr = PSMX_CTXT_USER(fi_context);
if (mr->domain->rma_ep->recv_cq && (req->cq_flags & FI_REMOTE_CQ_DATA)) {
event = psmx_cq_create_event_from_status(
mr->domain->rma_ep->recv_cq,
&psm_status, req->write.data,
(mr->domain->rma_ep->recv_cq == cq) ?
event_buffer : NULL,
count, src_addr);
if (!event)
return -FI_ENOMEM;
if (event == event_buffer) {
read_count++;
read_more = --count;
event_buffer = count ?
(uint8_t *)event_buffer + cq->entry_size :
NULL;
if (src_addr)
src_addr = count ? src_addr + 1 : NULL;
} else {
psmx_cq_enqueue_event(mr->domain->rma_ep->recv_cq, event);
if (mr->domain->rma_ep->recv_cq == cq)
read_more = 0;
}
}
if (mr->domain->rma_ep->caps & FI_RMA_EVENT) {
if (mr->domain->rma_ep->remote_write_cntr)
psmx_cntr_inc(mr->domain->rma_ep->remote_write_cntr);
if (mr->cntr && mr->cntr != mr->domain->rma_ep->remote_write_cntr)
psmx_cntr_inc(mr->cntr);
}
if (read_more)
continue;
return read_count;
}
case PSMX_REMOTE_READ_CONTEXT:
{
struct fi_context *fi_context = psm_status.context;
struct psmx_fid_mr *mr;
mr = PSMX_CTXT_USER(fi_context);
if (mr->domain->rma_ep->caps & FI_RMA_EVENT) {
if (mr->domain->rma_ep->remote_read_cntr)
psmx_cntr_inc(mr->domain->rma_ep->remote_read_cntr);
}
continue;
}
}
if (tmp_cq) {
event = psmx_cq_create_event_from_status(tmp_cq, &psm_status, 0,
(tmp_cq == cq) ? event_buffer : NULL, count,
src_addr);
if (!event)
return -FI_ENOMEM;
if (event == event_buffer) {
read_count++;
read_more = --count;
event_buffer = count ?
(uint8_t *)event_buffer + cq->entry_size :
NULL;
if (src_addr)
src_addr = count ? src_addr + 1 : NULL;
} else {
psmx_cq_enqueue_event(tmp_cq, event);
if (tmp_cq == cq)
read_more = 0;
}
}
if (tmp_cntr)
psmx_cntr_inc(tmp_cntr);
if (multi_recv) {
struct psmx_multi_recv *req;
psm_mq_req_t psm_req;
size_t len_remaining;
req = PSMX_CTXT_USER(fi_context);
req->offset += psm_status.nbytes;
len_remaining = req->len - req->offset;
if (len_remaining >= req->min_buf_size) {
if (len_remaining > PSMX_MAX_MSG_SIZE)
len_remaining = PSMX_MAX_MSG_SIZE;
err = psm_mq_irecv(tmp_ep->domain->psm_mq,
req->tag, req->tagsel, req->flag,
req->buf + req->offset,
len_remaining,
(void *)fi_context, &psm_req);
if (err != PSM_OK)
return psmx_errno(err);
PSMX_CTXT_REQ(fi_context) = psm_req;
} else {
free(req);
}
}
if (read_more)
continue;
return read_count;
} else if (err == PSM_MQ_NO_COMPLETIONS) {
fastlock_release(&domain->poll_lock);
return read_count;
} else {
fastlock_release(&domain->poll_lock);
return psmx_errno(err);
}
}
}