void complete_receive_operation()

in prov/bgq/include/rdma/bgq/fi_bgq_rx.h [329:642]


void complete_receive_operation (struct fi_bgq_ep * bgq_ep,
		struct fi_bgq_mu_packet * pkt,
		const uint64_t origin_tag,
		union fi_bgq_context * context,
		const unsigned is_context_ext,
		const unsigned is_multi_receive,
		const unsigned is_manual_progress) {

#ifdef FI_BGQ_TRACE
	fprintf(stderr,"complete_receive_operation starting\n");
#endif
	const uint64_t recv_len = context->len;
	void * recv_buf = context->buf;
	const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);

	const uint64_t immediate_data = pkt->hdr.pt2pt.immediate_data;

	if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
#ifdef FI_BGQ_TRACE
        fprintf(stderr,"complete_receive_operation - packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER\n");
#endif

		const uint64_t send_len = pkt->hdr.pt2pt.send.message_length;

		if (is_multi_receive) {		/* branch should compile out */
			if (send_len) memcpy(recv_buf, (void*)&pkt->payload.byte[0], send_len);

			union fi_bgq_context * original_multi_recv_context = context;
			context = (union fi_bgq_context *)((uintptr_t)recv_buf - sizeof(union fi_bgq_context));
			assert((((uintptr_t)context) & 0x07) == 0);

			context->flags = FI_RECV | FI_MSG | FI_BGQ_CQ_CONTEXT_MULTIRECV;
			context->buf = recv_buf;
			context->len = send_len;
			context->data = immediate_data;
			context->tag = 0;	/* tag is not valid for multi-receives */
			context->multi_recv_context = original_multi_recv_context;
			context->byte_counter = 0;

			/* the next 'fi_bgq_context' must be 8-byte aligned */
			uint64_t bytes_consumed = ((send_len + 8) & (~0x07ull)) + sizeof(union fi_bgq_context);
			original_multi_recv_context->len -= bytes_consumed;
			original_multi_recv_context->buf = (void*)((uintptr_t)(original_multi_recv_context->buf) + bytes_consumed);
#ifdef FI_BGQ_TRACE
        fprintf(stderr,"complete_receive_operation - is_multi_receive - enqueue cq for child context %p of parent context %p\n",context,original_multi_recv_context);
#endif


			/* post a completion event for the individual receive */
			fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */

		} else if (send_len <= recv_len) {
			if (send_len) memcpy(recv_buf, (void*)&pkt->payload.byte[0], send_len);
#ifdef FI_BGQ_TRACE
        fprintf(stderr,"EAGER complete_receive_operation send_len %lu <= recv_len %lu calling fi_bgq_cq_enqueue_completed\n",send_len,recv_len);
#endif

			context->buf = NULL;
			context->len = send_len;
			context->data = immediate_data;
			context->tag = origin_tag;
			context->byte_counter = 0;

			/* post a completion event for the individual receive */
			fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */

		} else {	/* truncation - unlikely */
#ifdef FI_BGQ_TRACE
        fprintf(stderr,"EAGER complete_receive_operation truncation - send_len %lu > recv_len %lu posting error\n",send_len,recv_len);

#endif

			struct fi_bgq_context_ext * ext;
			if (is_context_ext) {
				ext = (struct fi_bgq_context_ext *)context;
				ext->err_entry.op_context = ext->msg.op_context;
			} else {
				posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
				ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
				ext->err_entry.op_context = context;
			}

			ext->err_entry.flags = context->flags;
			ext->err_entry.len = recv_len;
			ext->err_entry.buf = recv_buf;
			ext->err_entry.data = immediate_data;
			ext->err_entry.tag = origin_tag;
			ext->err_entry.olen = send_len - recv_len;
			ext->err_entry.err = FI_ETRUNC;
			ext->err_entry.prov_errno = 0;
			ext->err_entry.err_data = NULL;

			ext->bgq_context.byte_counter = 0;

			fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
		}

		return;

	} else {			/* rendezvous packet */

		uint64_t niov = pkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
		assert(niov <= (7-is_multi_receive));
		uint64_t xfer_len = pkt->payload.rendezvous.mu_iov[0].message_length;
		{
			uint64_t i;
			for (i=1; i<niov; ++i) xfer_len += pkt->payload.rendezvous.mu_iov[i].message_length;
		}

		uint64_t byte_counter_vaddr = 0;

		if (is_multi_receive) {		/* branch should compile out */

			/* This code functionaliy is unverified - exit with an error mesg for now
 			 * when we have an mpich case for this we will then verify.
 			 */

			fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
			fflush(stderr);
			exit(1);


#ifdef FI_BGQ_TRACE
        fprintf(stderr,"rendezvous multirecv\n");
#endif

			union fi_bgq_context * multi_recv_context =
				(union fi_bgq_context *)((uintptr_t)recv_buf - sizeof(union fi_bgq_context));
			assert((((uintptr_t)multi_recv_context) & 0x07) == 0);

			multi_recv_context->flags = FI_RECV | FI_MSG | FI_BGQ_CQ_CONTEXT_MULTIRECV;
			multi_recv_context->buf = recv_buf;
			multi_recv_context->len = xfer_len;
			multi_recv_context->data = immediate_data;
			multi_recv_context->tag = 0;	/* tag is not valid for multi-receives */
			multi_recv_context->multi_recv_context = context;
			multi_recv_context->byte_counter = xfer_len;

			/* the next 'fi_bgq_context' must be 8-byte aligned */
			uint64_t bytes_consumed = ((xfer_len + 8) & (~0x07ull)) + sizeof(union fi_bgq_context);
			context->len -= bytes_consumed;
			context->buf = (void*)((uintptr_t)(context->buf) + bytes_consumed);

			byte_counter_vaddr = (uint64_t)&multi_recv_context->byte_counter;

			/* the original multi-receive context actually uses an
			 * operation counter - not a byte counter - but nevertheless
			 * the same field in the context structure is used */
			context->byte_counter += 1;

			/* post a completion event for the individual receive */
			fi_bgq_cq_enqueue_pending(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */

		} else if (xfer_len <= recv_len) {

#ifdef FI_BGQ_TRACE
        fprintf(stderr,"rendezvous complete_receive_operation xfer_len %lu <= recv_len %lu calling fi_bgq_cq_enqueue_pending\n",xfer_len,recv_len);
#endif
			context->len = xfer_len;
			context->data = immediate_data;
			context->tag = origin_tag;
			context->byte_counter = xfer_len;

			byte_counter_vaddr = (uint64_t)&context->byte_counter;

			/* post a completion event for the individual receive */
			fi_bgq_cq_enqueue_pending(bgq_ep->recv_cq, context, 0);	/* TODO - IS lock required? */

		} else {
#ifdef FI_BGQ_TRACE
        fprintf(stderr,"rendezvous truncation xfer_len %lu > recv_len %lu posting error\n",xfer_len,recv_len);
#endif

			/* truncation */
			struct fi_bgq_context_ext * ext;
			if (is_context_ext) {
				ext = (struct fi_bgq_context_ext *)context;
				ext->err_entry.op_context = ext->msg.op_context;
			} else {
				posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
				ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
				ext->err_entry.op_context = context;
			}

			ext->err_entry.flags = context->flags;
			ext->err_entry.len = recv_len;
			ext->err_entry.buf = recv_buf;
			ext->err_entry.data = immediate_data;
			ext->err_entry.tag = origin_tag;
			ext->err_entry.olen = xfer_len - recv_len;
			ext->err_entry.err = FI_ETRUNC;
			ext->err_entry.prov_errno = 0;
			ext->err_entry.err_data = NULL;

			ext->bgq_context.byte_counter = 0;

			byte_counter_vaddr = (uint64_t)&ext->bgq_context.byte_counter;

			fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);

			xfer_len = 0;
			niov = 0;
		}

		/* determine the physical address of the byte counter memory */
		uint64_t byte_counter_paddr = 0;
		{
			Kernel_MemoryRegion_t mr;
			Kernel_CreateMemoryRegion(&mr, (void*)byte_counter_vaddr, sizeof(uint64_t));
			byte_counter_paddr = (uint64_t)mr.BasePa + (byte_counter_vaddr - (uint64_t)mr.BaseVa);
		}

		/* determine the physical address of the destination buffer */
		uint64_t dst_paddr = 0;
		{
			Kernel_MemoryRegion_t mr;
			Kernel_CreateMemoryRegion(&mr, (void*)recv_buf, recv_len);
			dst_paddr = (uint64_t)mr.BasePa + ((uint64_t)recv_buf - (uint64_t)mr.BaseVa);
		}

		const uint64_t fifo_map = fi_bgq_mu_packet_get_fifo_map(pkt);
		const uint64_t is_local = (fifo_map & (MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL0 | MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL1)) != 0;

		/*
		 * inject a "remote get" descriptor - the payload is composed
		 * of two descriptors:
		 *
		 *   the first is a "direct put" descriptor that will rdma
		 *   transfer the source data from the origin and will
		 *   decrement a reception counter on the target as it
		 *   completes
		 *
		 *   the second is a "direct put" descriptor that will clear
		 *   the byte counter for the send completion entry on the
		 *   origin
		 */

		/* busy-wait until a fifo slot is available .. */
		MUHWI_Descriptor_t * rget_desc =
			fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);

		assert(rget_desc);
		assert((((uintptr_t)rget_desc)&0x1F) == 0);

		/* locate the payload lookaside slot */
		uint64_t payload_paddr = 0;
		MUHWI_Descriptor_t * payload =
			(MUHWI_Descriptor_t *)fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->rx.poll.injfifo,
				rget_desc, &payload_paddr);

		/* initialize the remote-get descriptor in the injection fifo */
		qpx_memcpy64((void*)rget_desc, (const void*)&bgq_ep->rx.poll.rzv.rget_model[is_local]);

		rget_desc->Pa_Payload = payload_paddr;
		rget_desc->PacketHeader.messageUnitHeader.Packet_Types.Remote_Get.Rget_Inj_FIFO_Id =
			pkt->hdr.pt2pt.rendezvous.rget_inj_fifo_id;	/* TODO - different rget inj fifos for tag vs msg operations? */

		rget_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(pkt->hdr.pt2pt.uid.fi);

		/* initialize the direct-put ("data transfer") descriptor(s) in the rget payload */
		unsigned i;
		for (i=0; i<niov; ++i) {
			MUHWI_Descriptor_t * xfer_desc = payload++;

			qpx_memcpy64((void*)xfer_desc, (const void*)&bgq_ep->rx.poll.rzv.dput_model[is_local]);

			xfer_desc->Pa_Payload = pkt->payload.rendezvous.mu_iov[i].src_paddr;
			const uint64_t message_length = pkt->payload.rendezvous.mu_iov[i].message_length;
			xfer_desc->Message_Length = message_length;
			MUSPI_SetRecPayloadBaseAddressInfo(xfer_desc, FI_BGQ_MU_BAT_ID_GLOBAL, dst_paddr);
			dst_paddr += message_length;
			xfer_desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Counter_Offset =
				MUSPI_GetAtomicAddress(byte_counter_paddr, MUHWI_ATOMIC_OPCODE_STORE_ADD);
			xfer_desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Rec_Counter_Base_Address_Id =
				FI_BGQ_MU_BAT_ID_GLOBAL;

			rget_desc->Message_Length += sizeof(MUHWI_Descriptor_t);

			if (is_multi_receive) {		/* branch should compile out */
				xfer_desc->Torus_FIFO_Map = fifo_map;
			}
		}

		/* initialize the direct-put ("origin completion") descriptor in the rget payload */
		{
			MUHWI_Descriptor_t * dput_desc = payload;
			qpx_memcpy64((void*)dput_desc, (const void*)&bgq_ep->rx.poll.rzv.dput_completion_model);

			const uint64_t counter_paddr = ((uint64_t) pkt->payload.rendezvous.cntr_paddr_rsh3b) << 3;
			dput_desc->Pa_Payload =
				MUSPI_GetAtomicAddress(counter_paddr,
					MUHWI_ATOMIC_OPCODE_LOAD_CLEAR);
		}

		/* initialize the memory-fifo ("rendezvous ack") descriptor in the rget payload for multi-receives */
		if (is_multi_receive) {			/* branch should compile out */
			MUHWI_Descriptor_t * ack_desc = ++payload;
			qpx_memcpy64((void*)ack_desc, (const void*)&bgq_ep->rx.poll.rzv.multi_recv_ack_model);

			ack_desc->Torus_FIFO_Map = fifo_map;
			rget_desc->Torus_FIFO_Map = fifo_map;
			rget_desc->Message_Length += sizeof(MUHWI_Descriptor_t);

			union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &ack_desc->PacketHeader;
			hdr->ack.context = (uintptr_t) context;
		}

		/*
		 * inject the descriptor
		 */
		MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
	}
	return;
}