int psmx2_cq_poll_mq()

in prov/psm2/src/psmx2_cq.c [1049:1559]


int psmx2_cq_poll_mq(struct psmx2_fid_cq *cq,
		     struct psmx2_trx_ctxt *trx_ctxt,
		     struct psmx2_cq_event *event_in,
		     int count, fi_addr_t *src_addr)
{
	psm2_mq_status2_t status_priv, *status = &status_priv;
	struct fi_context *fi_context;
	struct psmx2_fid_ep *ep;
	struct psmx2_fid_mr *mr;
	struct psmx2_am_request *am_req;
	struct psmx2_multi_recv *multi_recv_req;
	struct psmx2_sendv_request *sendv_req;
	struct psmx2_sendv_reply *sendv_rep;
	psm2_mq_req_t psm2_req;
	size_t len_remaining;
	void *op_context;
	void *buf;
	uint64_t flags;
	uint64_t data;
	int read_count = 0;
	int read_more = 1;
	int err;
	int context_type;

	while (read_more) {

		/*
		 * psm2_mq_test2 is called immediately after psm2_mq_ipeek with a lock held to
		 * prevent psm2_mq_ipeek from returning the same request multiple times under
		 * different threads.
		 */
		if (trx_ctxt->domain->poll_trylock_fn(&trx_ctxt->poll_lock, 2)) {
			err = PSM2_MQ_NO_COMPLETIONS;
		} else {
			err = psm2_mq_ipeek(trx_ctxt->psm2_mq, &psm2_req, NULL);
			if (err == PSM2_OK)
				psm2_mq_test2(&psm2_req, status);
			trx_ctxt->domain->poll_unlock_fn(&trx_ctxt->poll_lock, 2);
		}

		if (err == PSM2_OK) {
			fi_context = PSMX2_STATUS_CONTEXT(status);
			if (OFI_UNLIKELY(!fi_context)) {
				continue;
			}

			ep = PSMX2_CTXT_EP(fi_context);
			context_type = (int)PSMX2_CTXT_TYPE(fi_context);

			switch (context_type) {
			case PSMX2_SEND_CONTEXT:
			case PSMX2_TSEND_CONTEXT:
				if (ep->send_cq) {
					op_context = fi_context;
					buf = PSMX2_CTXT_USER(fi_context);
					flags = psmx2_comp_flags[context_type];
					err = psmx2_cq_tx_complete(
							cq, ep->send_cq, ep->av,
							status, op_context, buf, flags, 0,
							event_in, count, &read_count,
							&read_more);
					if (err)
						return err;
				}
				if (ep->send_cntr)
					psmx2_cntr_inc(ep->send_cntr, PSMX2_STATUS_ERROR(status));

				/* Bi-directional send/recv performance tweak for KNL */
				if (PSMX2_STATUS_SNDLEN(status) > 16384)
					read_more = 0;

				break;

			case PSMX2_NOCOMP_SEND_CONTEXT:
			case PSMX2_NOCOMP_TSEND_CONTEXT:
				if (ep->send_cq && PSMX2_STATUS_ERROR(status)) {
					op_context = NULL;
					buf = NULL;
					flags = psmx2_comp_flags[context_type];
					err = psmx2_cq_tx_complete(
							cq, ep->send_cq, ep->av,
							status, op_context, buf, flags, 0,
							event_in, count, &read_count,
							&read_more);
					if (err)
						return err;
				}
				if (ep->send_cntr)
					psmx2_cntr_inc(ep->send_cntr, PSMX2_STATUS_ERROR(status));
				break;

			case PSMX2_RECV_CONTEXT:
				if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
						  !psmx2_handle_sendv_req(ep, status, 0)))
					continue;
				if (ep->recv_cq) {
					op_context = fi_context;
					buf = PSMX2_CTXT_USER(fi_context);
					flags = psmx2_comp_flags[context_type];
					data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
					if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
						flags |= FI_REMOTE_CQ_DATA;
					err = psmx2_cq_rx_complete(
							cq, ep->recv_cq, ep->av,
							status, op_context, buf, flags, data,
							event_in, count, &read_count,
							&read_more, src_addr);
					if (err)
						return err;
				}
				if (ep->recv_cntr)
					psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
				break;

			case PSMX2_TRECV_CONTEXT:
				if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
						 !psmx2_handle_sendv_req(ep, status, 0)))
					continue;
				if (ep->recv_cq) {
					op_context = fi_context;
					buf = PSMX2_CTXT_USER(fi_context);
					flags = psmx2_comp_flags[context_type];
					data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
					if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
						flags |= FI_REMOTE_CQ_DATA;
					err = psmx2_cq_rx_complete(
							cq, ep->recv_cq, ep->av,
							status, op_context, buf, flags, data,
							event_in, count, &read_count,
							&read_more, src_addr);
					if (err)
						return err;
				}
				if (ep->recv_cntr)
					psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
				break;

			case PSMX2_NOCOMP_RECV_CONTEXT:
				if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
						 !psmx2_handle_sendv_req(ep, status, 0))) {
					PSMX2_EP_PUT_OP_CONTEXT(ep, fi_context);
					continue;
				}
				PSMX2_EP_PUT_OP_CONTEXT(ep, fi_context);
				if (OFI_UNLIKELY(ep->recv_cq && PSMX2_STATUS_ERROR(status))) {
					op_context = NULL;
					buf = NULL;
					flags = psmx2_comp_flags[context_type];
					data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
					if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
						flags |= FI_REMOTE_CQ_DATA;
					err = psmx2_cq_rx_complete(
							cq, ep->recv_cq, ep->av,
							status, op_context, buf, flags, data,
							event_in, count, &read_count,
							&read_more, src_addr);
					if (err)
						return err;
				}
				if (ep->recv_cntr)
					psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
				break;

			case PSMX2_NOCOMP_TRECV_CONTEXT:
				if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
						 !psmx2_handle_sendv_req(ep, status, 0))) {
					PSMX2_EP_PUT_OP_CONTEXT(ep, fi_context);
					continue;
				}
				PSMX2_EP_PUT_OP_CONTEXT(ep, fi_context);
				if (OFI_UNLIKELY(ep->recv_cq && PSMX2_STATUS_ERROR(status))) {
					op_context = NULL;
					buf = NULL;
					flags = psmx2_comp_flags[context_type];
					data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
					if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
						flags |= FI_REMOTE_CQ_DATA;
					err = psmx2_cq_rx_complete(
							cq, ep->recv_cq, ep->av,
							status, op_context, buf, flags, data,
							event_in, count, &read_count,
							&read_more, src_addr);
					if (err)
						return err;
				}
				if (ep->recv_cntr)
					psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));
				break;

			case PSMX2_WRITE_CONTEXT:
				am_req = container_of(fi_context, struct psmx2_am_request,
						      fi_context);
				op_context = PSMX2_CTXT_USER(fi_context);
				free(am_req->tmpbuf);
				psmx2_am_request_free(trx_ctxt, am_req);
				if (ep->send_cq) {
					buf = NULL;
					flags = psmx2_comp_flags[context_type];
					err = psmx2_cq_tx_complete(
							cq, ep->send_cq, ep->av,
							status, op_context, buf, flags, 0,
							event_in, count, &read_count,
							&read_more);
					if (err)
						return err;
				}
				if (ep->write_cntr)
					psmx2_cntr_inc(ep->write_cntr, PSMX2_STATUS_ERROR(status));
				break;

			case PSMX2_NOCOMP_WRITE_CONTEXT:
				am_req = container_of(fi_context, struct psmx2_am_request,
						      fi_context);
				op_context = PSMX2_CTXT_USER(fi_context);
				free(am_req->tmpbuf);
				psmx2_am_request_free(trx_ctxt, am_req);
				if (OFI_UNLIKELY(ep->send_cq && PSMX2_STATUS_ERROR(status))) {
					buf = NULL;
					flags = psmx2_comp_flags[context_type];
					err = psmx2_cq_tx_complete(
							cq, ep->send_cq, ep->av,
							status, op_context, buf, flags, 0,
							event_in, count, &read_count,
							&read_more);
					if (err)
						return err;
				}
				if (ep->write_cntr)
					psmx2_cntr_inc(ep->write_cntr, PSMX2_STATUS_ERROR(status));
				break;

			case PSMX2_READ_CONTEXT:
				am_req = container_of(fi_context, struct psmx2_am_request,
						      fi_context);
				if (OFI_UNLIKELY(am_req->op == PSMX2_AM_REQ_READV)) {
					am_req->read.len_read += PSMX2_STATUS_RCVLEN(status);
					if (am_req->read.len_read < am_req->read.len) {
						FI_INFO(&psmx2_prov, FI_LOG_EP_DATA,
							"readv: long protocol finishes early\n");
						if (PSMX2_STATUS_ERROR(status))
							am_req->error = psmx2_errno(PSMX2_STATUS_ERROR(status));
						/* Request to be freed in AM handler */
						continue;
					}
				}
				op_context = PSMX2_CTXT_USER(fi_context);
				free(am_req->tmpbuf);
				psmx2_am_request_free(trx_ctxt, am_req);
				if (ep->send_cq) {
					buf = NULL;
					flags = psmx2_comp_flags[context_type];
					err = psmx2_cq_tx_complete(
							cq, ep->send_cq, ep->av,
							status, op_context, buf, flags, 0,
							event_in, count, &read_count,
							&read_more);
					if (err)
						return err;
				}
				if (ep->read_cntr)
					psmx2_cntr_inc(ep->read_cntr, PSMX2_STATUS_ERROR(status));
				break;

			case PSMX2_NOCOMP_READ_CONTEXT:
				am_req = container_of(fi_context, struct psmx2_am_request,
						      fi_context);
				if (OFI_UNLIKELY(am_req->op == PSMX2_AM_REQ_READV)) {
					am_req->read.len_read += PSMX2_STATUS_RCVLEN(status);
					if (am_req->read.len_read < am_req->read.len) {
						FI_INFO(&psmx2_prov, FI_LOG_EP_DATA,
							"readv: long protocol finishes early\n");
						if (PSMX2_STATUS_ERROR(status))
							am_req->error = psmx2_errno(PSMX2_STATUS_ERROR(status));
						/* Request to be freed in AM handler */
						continue;
					}
				}
				op_context = PSMX2_CTXT_USER(fi_context);
				free(am_req->tmpbuf);
				psmx2_am_request_free(trx_ctxt, am_req);
				if (OFI_UNLIKELY(ep->send_cq && PSMX2_STATUS_ERROR(status))) {
					buf = NULL;
					flags = psmx2_comp_flags[context_type];
					err = psmx2_cq_tx_complete(
							cq, ep->send_cq, ep->av,
							status, op_context, buf, flags, 0,
							event_in, count, &read_count,
							&read_more);
					if (err)
						return err;
				}
				if (ep->read_cntr)
					psmx2_cntr_inc(ep->read_cntr, PSMX2_STATUS_ERROR(status));
				break;

			case PSMX2_MULTI_RECV_CONTEXT:
				if (OFI_UNLIKELY(PSMX2_IS_IOV_HEADER(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))) &&
				    !psmx2_handle_sendv_req(ep, status, 1)))
					continue;
				multi_recv_req = PSMX2_CTXT_USER(fi_context);
				if (ep->recv_cq) {
					op_context = fi_context;
					buf = multi_recv_req->buf + multi_recv_req->offset;
					flags = psmx2_comp_flags[context_type];
					data = PSMX2_GET_CQDATA(PSMX2_STATUS_TAG(status));
					if (PSMX2_HAS_IMM(PSMX2_GET_FLAGS(PSMX2_STATUS_TAG(status))))
						flags |= FI_REMOTE_CQ_DATA;
					if (multi_recv_req->offset + PSMX2_STATUS_RCVLEN(status) +
					    multi_recv_req->min_buf_size > multi_recv_req->len)
						flags |= FI_MULTI_RECV;	/* buffer used up */
					err = psmx2_cq_rx_complete(
							cq, ep->recv_cq, ep->av,
							status, op_context, buf, flags, data,
							event_in, count, &read_count,
							&read_more, src_addr);
					if (err)
						return err;
				}
				if (ep->recv_cntr)
					psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));

				/* repost multi-recv buffer */
				multi_recv_req->offset += PSMX2_STATUS_RCVLEN(status);
				len_remaining = multi_recv_req->len - multi_recv_req->offset;
				if (len_remaining >= multi_recv_req->min_buf_size) {
					if (len_remaining > PSMX2_MAX_MSG_SIZE)
						len_remaining = PSMX2_MAX_MSG_SIZE;
					err = psm2_mq_irecv2(ep->rx->psm2_mq,
							     multi_recv_req->src_addr, &multi_recv_req->tag,
							     &multi_recv_req->tagsel, multi_recv_req->flag,
							     multi_recv_req->buf + multi_recv_req->offset,
							     len_remaining,
							     (void *)fi_context, &psm2_req);
					if (err != PSM2_OK)
						return psmx2_errno(err);
					PSMX2_CTXT_REQ(fi_context) = psm2_req;
				} else {
					free(multi_recv_req);
				}
				break;

			case PSMX2_REMOTE_WRITE_CONTEXT:
				am_req = container_of(fi_context, struct psmx2_am_request, fi_context);
				if (am_req->op & PSMX2_AM_FORCE_ACK) {
					am_req->error = psmx2_errno(PSMX2_STATUS_ERROR(status));
					psmx2_am_ack_rma(am_req);
				}

				if (am_req->ep->recv_cq && (am_req->cq_flags & FI_REMOTE_CQ_DATA)) {
					op_context = NULL;
					buf = NULL;
					flags = psmx2_comp_flags[context_type] | FI_REMOTE_CQ_DATA;
					err = psmx2_cq_rx_complete(
							cq, am_req->ep->recv_cq, am_req->ep->av,
							status, op_context, buf, flags, am_req->write.data,
							event_in, count, &read_count,
							&read_more, src_addr);
					if (err) {
						psmx2_am_request_free(trx_ctxt, am_req);
						return err;
					}
				}

				if (am_req->ep->caps & FI_RMA_EVENT) {
					if (am_req->ep->remote_write_cntr)
						psmx2_cntr_inc(am_req->ep->remote_write_cntr, 0);

					mr = PSMX2_CTXT_USER(fi_context);
					if (mr->cntr && mr->cntr != am_req->ep->remote_write_cntr)
						psmx2_cntr_inc(mr->cntr, 0);
				}

				/* NOTE: am_req->tmpbuf is unused here */
				psmx2_am_request_free(trx_ctxt, am_req);
				break;

			case PSMX2_REMOTE_READ_CONTEXT:
				am_req = container_of(fi_context, struct psmx2_am_request, fi_context);
				if (am_req->ep->caps & FI_RMA_EVENT) {
					if (am_req->ep->remote_read_cntr)
						psmx2_cntr_inc(am_req->ep->remote_read_cntr, 0);
				}

				/* NOTE: am_req->tmpbuf is unused here */
				psmx2_am_request_free(trx_ctxt, am_req);
				break;

			case PSMX2_SENDV_CONTEXT:
				sendv_req = PSMX2_CTXT_USER(fi_context);
				sendv_req->iov_done++;
				if (sendv_req->iov_protocol == PSMX2_IOV_PROTO_MULTI &&
				    sendv_req->iov_done < sendv_req->iov_info.count + 1) {
					sendv_req->tag = PSMX2_STATUS_TAG(status);
					continue;
				}
				if (ep->send_cq && !sendv_req->no_completion) {
					op_context = sendv_req->user_context;
					buf = NULL;
					flags = psmx2_comp_flags[context_type] |
						sendv_req->comp_flag;
					err = psmx2_cq_tx_complete(
							cq, ep->send_cq, ep->av,
							status, op_context, buf, flags, 0,
							event_in, count, &read_count,
							&read_more);
					if (err) {
						free(sendv_req);
						return err;
					}
				}
				if (ep->send_cntr)
					psmx2_cntr_inc(ep->send_cntr, PSMX2_STATUS_ERROR(status));
				free(sendv_req);
				break;

			case PSMX2_IOV_SEND_CONTEXT:
				sendv_req = PSMX2_CTXT_USER(fi_context);
				sendv_req->iov_done++;
				if (sendv_req->iov_done < sendv_req->iov_info.count + 1)
					continue;
				PSMX2_STATUS_TAG(status) = sendv_req->tag;
				if (ep->send_cq && !sendv_req->no_completion) {
					op_context = sendv_req->user_context;
					buf = NULL;
					flags = psmx2_comp_flags[context_type] |
						sendv_req->comp_flag;
					err = psmx2_cq_tx_complete(
							cq, ep->send_cq, ep->av,
							status, op_context, buf, flags, 0,
							event_in, count, &read_count,
							&read_more);
					if (err) {
						free(sendv_req);
						return err;
					}
				}
				if (ep->send_cntr)
					psmx2_cntr_inc(ep->send_cntr, PSMX2_STATUS_ERROR(status));
				free(sendv_req);
				break;

			case PSMX2_IOV_RECV_CONTEXT:
				sendv_rep = PSMX2_CTXT_USER(fi_context);
				sendv_rep->iov_done++;
				sendv_rep->msg_length += PSMX2_STATUS_SNDLEN(status);
				sendv_rep->bytes_received += PSMX2_STATUS_RCVLEN(status);
				if (PSMX2_STATUS_ERROR(status) != PSM2_OK)
					sendv_rep->error_code = PSMX2_STATUS_ERROR(status);
				if (sendv_rep->iov_done < sendv_rep->iov_info.count)
					continue;

				PSMX2_STATUS_TAG(status) = sendv_rep->tag;
				PSMX2_STATUS_RCVLEN(status) = sendv_rep->bytes_received;
				PSMX2_STATUS_SNDLEN(status) = sendv_rep->msg_length;
				PSMX2_STATUS_ERROR(status) = sendv_rep->error_code;

				if (ep->recv_cq && !sendv_rep->no_completion) {
					op_context = sendv_rep->user_context;
					buf = sendv_rep->buf;
					flags = psmx2_comp_flags[context_type] |
						sendv_rep->comp_flag;
					err = psmx2_cq_rx_complete(
							cq, ep->recv_cq, ep->av,
							status, op_context, buf, flags, 0,
							event_in, count, &read_count,
							&read_more, src_addr);
					if (err) {
						free(sendv_rep);
						return err;
					}
				}
				if (ep->recv_cntr)
					psmx2_cntr_inc(ep->recv_cntr, PSMX2_STATUS_ERROR(status));

				if (sendv_rep->multi_recv) {
					/* repost the multi-recv buffer */
					fi_context = sendv_rep->user_context;
					multi_recv_req = PSMX2_CTXT_USER(fi_context);
					multi_recv_req->offset += PSMX2_STATUS_RCVLEN(status);
					len_remaining = multi_recv_req->len - multi_recv_req->offset;
					if (len_remaining >= multi_recv_req->min_buf_size) {
						if (len_remaining > PSMX2_MAX_MSG_SIZE)
							len_remaining = PSMX2_MAX_MSG_SIZE;
						err = psm2_mq_irecv2(ep->rx->psm2_mq,
								     multi_recv_req->src_addr, &multi_recv_req->tag,
								     &multi_recv_req->tagsel, multi_recv_req->flag,
								     multi_recv_req->buf + multi_recv_req->offset,
								     len_remaining,
								     (void *)fi_context, &psm2_req);
						if (err != PSM2_OK) {
							free(sendv_rep);
							return psmx2_errno(err);
						}
						PSMX2_CTXT_REQ(fi_context) = psm2_req;
					} else {
						free(multi_recv_req);
					}
				}

				free(sendv_rep);
				break;
			}
		} else if (err == PSM2_MQ_NO_COMPLETIONS) {
			return read_count;
		} else {
			return psmx2_errno(err);
		}
	}

	return read_count;
}