int process_mfifo_context()

in prov/bgq/include/rdma/bgq/fi_bgq_rx.h [1006:1386]


int process_mfifo_context (struct fi_bgq_ep * bgq_ep, const unsigned poll_msg,
		const uint64_t cancel_context, union fi_bgq_context * context,
		const uint64_t rx_op_flags, const uint64_t is_context_ext,
		const unsigned is_manual_progress) {
#ifdef FI_BGQ_TRACE
	fprintf(stderr,"process_mfifo_context starting - context->tag is %d\n",context->tag);
	if (rx_op_flags & FI_PEEK)
		fprintf(stderr,"just peeking\n");
	fflush(stderr);
#endif
	if (cancel_context) {	/* branch should compile out */
		const uint64_t compare_context = is_context_ext ?
			(uint64_t)(((struct fi_bgq_context_ext *)context)->msg.op_context) :
			(uint64_t)context;

		if (compare_context == cancel_context) {

			struct fi_bgq_context_ext * ext;
			if (is_context_ext) {
				ext = (struct fi_bgq_context_ext *)context;
			} else {
				posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
				ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
			}

			ext->bgq_context.byte_counter = 0;
			ext->err_entry.op_context = (void *)cancel_context;
			ext->err_entry.flags = rx_op_flags;
			ext->err_entry.len = 0;
			ext->err_entry.buf = 0;
			ext->err_entry.data = 0;
			ext->err_entry.tag = context->tag;
			ext->err_entry.olen = 0;
			ext->err_entry.err = FI_ECANCELED;
			ext->err_entry.prov_errno = 0;
			ext->err_entry.err_data = NULL;

			fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0); 

			return FI_ECANCELED;
		}
	}

	if ((rx_op_flags & (FI_PEEK | FI_CLAIM | FI_MULTI_RECV)) == 0) {	/* likely */

		/* search the unexpected packet queue */
		struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
		struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
		struct fi_bgq_mu_packet * prev = NULL;
		struct fi_bgq_mu_packet * uepkt = head;

		unsigned found_match = 0;
		while (uepkt != NULL) {

#ifdef FI_BGQ_TRACE
	fprintf(stderr,"process_mfifo_context - searching unexpected queue\n");
	fflush(stderr);
#endif
			if (is_match(uepkt, context, poll_msg)) {
#ifdef FI_BGQ_TRACE
	fprintf(stderr,"process_mfifo_context - found match on unexpected queue\n");
	fflush(stderr);
#endif

				/* branch will compile out */
				if (poll_msg)
					complete_receive_operation(bgq_ep, uepkt,
						0, context, 0, 0, is_manual_progress);
				else
					complete_receive_operation(bgq_ep, uepkt,
						uepkt->hdr.pt2pt.ofi_tag, context, 0, 0, is_manual_progress);

				/* remove the uepkt from the ue queue */
				if (head == tail) {
					bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
					bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
				} else if (prev == NULL) {
					bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
				} else if (tail == uepkt) {
					bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
					prev->next = NULL;
				} else {
					prev->next = uepkt->next;
				}

				/* ... and prepend the uehdr to the ue free list. */
				uepkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
				bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt;

				/* found a match; break from the loop */
				uepkt = NULL;
				found_match = 1;

			} else {

				/* a match was not found; advance to the next ue header */
				prev = uepkt;
				uepkt = uepkt->next;
			}
		}

		if (!found_match) {

#ifdef FI_BGQ_TRACE
	fprintf(stderr,"process_mfifo_context - nothing found on unexpected queue adding to match queue for poll_msg %u context->tag is %d context is %p mq addr is %p\n",poll_msg,context->tag,context,&(bgq_ep->rx.poll.rfifo[poll_msg].mq));
	fflush(stderr);
#endif
			/*
			 * no unexpected headers were matched; add this match
			 * information to the appropriate match queue
			 */

			union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;

			context->next = NULL;
			if (tail == NULL) {
				bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context;
			} else {
				tail->next = context;
			}
			bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = context;
		}

	} else if (rx_op_flags & FI_PEEK) {	/* unlikely */

		/* search the unexpected packet queue */
		struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
		struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
		struct fi_bgq_mu_packet * prev = NULL;
		struct fi_bgq_mu_packet * uepkt = head;

#ifdef FI_BGQ_TRACE
	fprintf(stderr,"process_mfifo_context - rx_op_flags & FI_PEEK searching unexpected queue\n");
	if (uepkt == NULL)
		fprintf(stderr,"uepkt == NULL\n");
	else
		fprintf(stderr,"uepkt != NULL\n");

	fflush(stderr);
#endif
		unsigned found_match = 0;
		while (uepkt != NULL) {

#ifdef FI_BGQ_TRACE
	fprintf(stderr,"process_mfifo_context uepkt != NULL - rx_op_flags & FI_PEEK searching unexpected queue\n");
	fflush(stderr);
#endif
			if (is_match(uepkt, context, poll_msg)) {

				const uint64_t packet_type = fi_bgq_mu_packet_type_get(uepkt);
				if (packet_type & FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS) {
					const uint64_t niov = uepkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
					uint64_t len = 0;
					unsigned i;
					for (i=0; i<niov; ++i) len += uepkt->payload.rendezvous.mu_iov[i].message_length;
					context->len = len;
				} else {	/* "eager" or "eager with completion" packet type */
					context->len = uepkt->hdr.pt2pt.send.message_length;
				}
				context->tag = poll_msg ? 0 : uepkt->hdr.pt2pt.ofi_tag;
				context->byte_counter = 0;

				if (rx_op_flags & FI_CLAIM) { /* both FI_PEEK and FI_CLAIM were specified */
					assert((rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT) == 0);

					context->claim = uepkt;

					/* remove the uepkt from the ue queue */
					if (head == tail) {
						bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
						bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
					} else if (prev == NULL) {
						bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
					} else if (tail == uepkt) {
						bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
						prev->next = NULL;
					} else {
						prev->next = uepkt->next;
					}
				}
				/* tranfer immediate data from pkt to context for matching FI_PEEK */
				context->data = uepkt->hdr.pt2pt.immediate_data;

				/* post a completion event for the receive */
				fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */

				found_match = 1;
				uepkt = NULL;

			} else {

				/* a match was not found; advance to the next ue header */
				prev = uepkt;
				uepkt = uepkt->next;
			}
		}

		if (!found_match) {

#ifdef FI_BGQ_TRACE
	fprintf(stderr,"didn't find a match for this FI_PEEK\n");
	fflush(stderr);
#endif
			/* did not find a match for this "peek" */


			struct fi_bgq_context_ext * ext;
			uint64_t mfifo_value;
			if (is_context_ext) {
				ext = (struct fi_bgq_context_ext *)context;
				mfifo_value = (uint64_t)context >> 3;
			} else {
				posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
				ext->bgq_context.flags = rx_op_flags | FI_BGQ_CQ_CONTEXT_EXT;

				mfifo_value = (uint64_t)ext >> 3;
			}

			ext->err_entry.op_context = context;
			ext->err_entry.flags = rx_op_flags;
			ext->err_entry.len = 0;
			ext->err_entry.buf = 0;
			ext->err_entry.data = 0;
			ext->err_entry.tag = 0;
			ext->err_entry.olen = 0;
			ext->err_entry.err = FI_ENOMSG;
			ext->err_entry.prov_errno = 0;
			ext->err_entry.err_data = NULL;
			ext->bgq_context.byte_counter = 0;

#ifdef FI_BGQ_TRACE
	fprintf(stderr,"process_mfifo_context -  no match found on unexpected queue posting error\n");
	fflush(stderr);
#endif
			fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);

		}

	} else if (rx_op_flags & FI_CLAIM) {	/* unlikely */
		assert((rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT) == 0);
#ifdef FI_BGQ_TRACE
	fprintf(stderr,"process_mfifo_context -  rx_op_flags & FI_CLAIM complete receive operation\n");
#endif

		/* only FI_CLAIM was specified
		 *
		 * this occurs after a previous FI_PEEK + FI_CLAIM
		 * operation has removed an unexpected packet from
		 * the queue and saved a pointer to it in the context
		 *
		 * complete the receive for this "claimed" message ... */
		struct fi_bgq_mu_packet * claimed_pkt = context->claim;
		if (poll_msg)
			complete_receive_operation(bgq_ep, claimed_pkt,
				0, context, 0, 0, is_manual_progress);
		else
			complete_receive_operation(bgq_ep, claimed_pkt,
				claimed_pkt->hdr.pt2pt.ofi_tag, context, 0, 0, is_manual_progress);

		/* ... and prepend the uehdr to the ue free list. */
		claimed_pkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
		bgq_ep->rx.poll.rfifo[poll_msg].ue.free = claimed_pkt;

	} else if (poll_msg && (rx_op_flags & FI_MULTI_RECV)) {		/* unlikely - branch should compile out for tagged receives */
		/* search the unexpected packet queue */
		struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
		struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
		struct fi_bgq_mu_packet * prev = NULL;
		struct fi_bgq_mu_packet * uepkt = head;

		unsigned full_multirecv_buffer = 0;
		while (uepkt != NULL) {

			if (is_match(uepkt, context, poll_msg)) {

				/* verify that there is enough space available in
				 * the multi-receive buffer for the incoming data */
				const uint64_t recv_len = context->len;
				const uint64_t packet_type = fi_bgq_mu_packet_type_get(uepkt);
				uint64_t send_len = 0;

				if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
					send_len = uepkt->hdr.pt2pt.send.message_length;
				} else if (packet_type & FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS) {

					/* This code functionaliy is unverified - exit with an error mesg for now
					 * when we have an mpich case for this we will then verify.
				 	*/

					fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
					fflush(stderr);
					exit(1);

					const uint64_t niov = uepkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
					send_len = uepkt->payload.rendezvous.mu_iov[0].message_length;
					uint64_t i;
					for (i=1; i<niov; ++i) send_len += uepkt->payload.rendezvous.mu_iov[i].message_length;
				}

				if (send_len > recv_len) {
					/* There is not enough room for the next subcontext multirec.
 					 * to preserver the ordering just break off here with whatever
 					 * matches are in the buffer and hopefully the next multirecv
 					 * has space.
 					 */

					uepkt = NULL;
					full_multirecv_buffer = 1;
					context->byte_counter = 0;
					fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);

				} else {
					complete_receive_operation(bgq_ep, uepkt,
						0, context, 0, 1, is_manual_progress);

					/* remove the uepkt from the ue queue */
					if (head == tail) {
						bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
						bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
					} else if (prev == NULL) {
						bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
					} else if (tail == uepkt) {
						bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
						prev->next = NULL;
					} else {
						prev->next = uepkt->next;
					}

					struct fi_bgq_mu_packet *matched_uepkt_next = uepkt->next;

					/* ... and prepend the uehdr to the ue free list. */
					uepkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
					bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt;

					if (context->len < bgq_ep->rx.poll.min_multi_recv) {
						/* after processing this message there is not
						 * enough space available in the multi-receive
						 * buffer to receive the next message; break
						 * from the loop and post a 'FI_MULTI_RECV'
						 * event to the completion queue. */
						uepkt = NULL;
						full_multirecv_buffer = 1;

						/* post a completion event for the multi-receive */
						context->byte_counter = 0;
						fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */
					}
					else {
						uepkt = matched_uepkt_next;
					}

				}

			} else {

				/* a match was not found; advance to the next ue header */
				prev = uepkt;
				uepkt = uepkt->next;
			}
		}

		if (!full_multirecv_buffer) {

			/* The multirecv context has room in its buffer.
			 * Post to match queue for further filling.
			 */

			union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;

			context->next = NULL;
			if (tail == NULL) {
				bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context;
			} else {
				tail->next = context;
			}
			bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = context;
		}
	}

	return 0;
}