int psmx_cq_poll_mq()

in prov/psm/src/psmx_cq.c [348:569]


int psmx_cq_poll_mq(struct psmx_fid_cq *cq, struct psmx_fid_domain *domain,
		    struct psmx_cq_event *event_in, int count, fi_addr_t *src_addr)
{
	psm_mq_req_t psm_req;
	psm_mq_status_t psm_status;
	struct fi_context *fi_context;
	struct psmx_fid_ep *tmp_ep;
	struct psmx_fid_cq *tmp_cq;
	struct psmx_fid_cntr *tmp_cntr;
	struct psmx_cq_event *event;
	int multi_recv;
	int err;
	int read_more = 1;
	int read_count = 0;
	void *event_buffer = count ? event_in : NULL;

	while (1) {
		/* psm_mq_ipeek and psm_mq_test is suposed to be called
		 * in sequence. If the same sequence from different threads
		 * are interleaved the behavior is errorous: the second
		 * psm_mq_test could derefernce a request that has been
		 * freed because the two psm_mq_ipeek calls may return the
		 * same request. Use a lock to ensure that won't happen.
		 */
		if (fastlock_tryacquire(&domain->poll_lock))
			return read_count;

		err = psm_mq_ipeek(domain->psm_mq, &psm_req, NULL);

		if (err == PSM_OK) {
			err = psm_mq_test(&psm_req, &psm_status);
			fastlock_release(&domain->poll_lock);

			fi_context = psm_status.context;

			if (!fi_context)
				continue;

			tmp_ep = PSMX_CTXT_EP(fi_context);
			tmp_cq = NULL;
			tmp_cntr = NULL;
			multi_recv = 0;

			switch ((int)PSMX_CTXT_TYPE(fi_context)) {
			case PSMX_SEND_CONTEXT:
			case PSMX_TSEND_CONTEXT:
				tmp_cq = tmp_ep->send_cq;
				tmp_cntr = tmp_ep->send_cntr;
				break;

			case PSMX_NOCOMP_SEND_CONTEXT:
				if (psm_status.error_code)
					tmp_cq = tmp_ep->send_cq;
				tmp_cntr = tmp_ep->send_cntr;
				break;

			case PSMX_MULTI_RECV_CONTEXT:
				multi_recv = 1;
				/* Fall through */
			case PSMX_RECV_CONTEXT:
			case PSMX_TRECV_CONTEXT:
				tmp_cq = tmp_ep->recv_cq;
				tmp_cntr = tmp_ep->recv_cntr;
				break;

			case PSMX_NOCOMP_RECV_CONTEXT:
				if (psm_status.error_code)
					tmp_cq = tmp_ep->recv_cq;
				tmp_cntr = tmp_ep->recv_cntr;
				break;

			case PSMX_WRITE_CONTEXT:
				tmp_cq = tmp_ep->send_cq;
				tmp_cntr = tmp_ep->write_cntr;
				break;

			case PSMX_NOCOMP_WRITE_CONTEXT:
				if (psm_status.error_code)
					tmp_cq = tmp_ep->send_cq;
				tmp_cntr = tmp_ep->write_cntr;
				break;

			case PSMX_READ_CONTEXT:
				tmp_cq = tmp_ep->send_cq;
				tmp_cntr = tmp_ep->read_cntr;
				break;

			case PSMX_NOCOMP_READ_CONTEXT:
				if (psm_status.error_code)
					tmp_cq = tmp_ep->send_cq;
				tmp_cntr = tmp_ep->read_cntr;
				break;

			case PSMX_REMOTE_WRITE_CONTEXT:
				{
				  struct fi_context *fi_context = psm_status.context;
				  struct psmx_fid_mr *mr;
				  struct psmx_am_request *req;

				  req = container_of(fi_context, struct psmx_am_request, fi_context);
				  if (req->op & PSMX_AM_FORCE_ACK) {
					req->error = psmx_errno(psm_status.error_code);
					psmx_am_ack_rma(req);
				  }

				  mr = PSMX_CTXT_USER(fi_context);
				  if (mr->domain->rma_ep->recv_cq && (req->cq_flags & FI_REMOTE_CQ_DATA)) {
					event = psmx_cq_create_event_from_status(
							mr->domain->rma_ep->recv_cq,
							&psm_status, req->write.data,
							(mr->domain->rma_ep->recv_cq == cq) ?
								event_buffer : NULL,
							count, src_addr);
					if (!event)
						return -FI_ENOMEM;

					if (event == event_buffer) {
						read_count++;
						read_more = --count;
						event_buffer = count ?
							(uint8_t *)event_buffer + cq->entry_size :
							NULL;
						if (src_addr)
							src_addr = count ? src_addr + 1 : NULL;
					} else {
						psmx_cq_enqueue_event(mr->domain->rma_ep->recv_cq, event);
						if (mr->domain->rma_ep->recv_cq == cq)
							read_more = 0;
					}
				  }

				  if (mr->domain->rma_ep->caps & FI_RMA_EVENT) {
					  if (mr->domain->rma_ep->remote_write_cntr)
						psmx_cntr_inc(mr->domain->rma_ep->remote_write_cntr);

					  if (mr->cntr && mr->cntr != mr->domain->rma_ep->remote_write_cntr)
						psmx_cntr_inc(mr->cntr);
				  }

				  if (read_more)
					continue;

				  return read_count;
				}

			case PSMX_REMOTE_READ_CONTEXT:
				{
				  struct fi_context *fi_context = psm_status.context;
				  struct psmx_fid_mr *mr;
				  mr = PSMX_CTXT_USER(fi_context);
				  if (mr->domain->rma_ep->caps & FI_RMA_EVENT) {
					  if (mr->domain->rma_ep->remote_read_cntr)
						psmx_cntr_inc(mr->domain->rma_ep->remote_read_cntr);
				  }

				  continue;
				}
			}

			if (tmp_cq) {
				event = psmx_cq_create_event_from_status(tmp_cq, &psm_status, 0,
							(tmp_cq == cq) ? event_buffer : NULL, count,
							src_addr);
				if (!event)
					return -FI_ENOMEM;

				if (event == event_buffer) {
					read_count++;
					read_more = --count;
					event_buffer = count ?
						(uint8_t *)event_buffer + cq->entry_size :
						NULL;
					if (src_addr)
						src_addr = count ? src_addr + 1 : NULL;
				} else {
					psmx_cq_enqueue_event(tmp_cq, event);
					if (tmp_cq == cq)
						read_more = 0;
				}
			}

			if (tmp_cntr)
				psmx_cntr_inc(tmp_cntr);

			if (multi_recv) {
				struct psmx_multi_recv *req;
				psm_mq_req_t psm_req;
				size_t len_remaining;

				req = PSMX_CTXT_USER(fi_context);
				req->offset += psm_status.nbytes;
				len_remaining = req->len - req->offset;
				if (len_remaining >= req->min_buf_size) {
					if (len_remaining > PSMX_MAX_MSG_SIZE)
						len_remaining = PSMX_MAX_MSG_SIZE;
					err = psm_mq_irecv(tmp_ep->domain->psm_mq,
							   req->tag, req->tagsel, req->flag,
							   req->buf + req->offset, 
							   len_remaining,
							   (void *)fi_context, &psm_req);
					if (err != PSM_OK)
						return psmx_errno(err);

					PSMX_CTXT_REQ(fi_context) = psm_req;
				} else {
					free(req);
				}
			}

			if (read_more)
				continue;

			return read_count;
		} else if (err == PSM_MQ_NO_COMPLETIONS) {
			fastlock_release(&domain->poll_lock);
			return read_count;
		} else {
			fastlock_release(&domain->poll_lock);
			return psmx_errno(err);
		}
	}
}