ssize_t fi_bgq_send_generic_flags()

in prov/bgq/include/rdma/fi_direct_endpoint.h [522:781]


ssize_t fi_bgq_send_generic_flags(struct fid_ep *ep,
		const void *buf, size_t len, void *desc,
		fi_addr_t dest_addr, uint64_t tag, void *context,
		const uint32_t data, int lock_required,
		const unsigned is_msg, const unsigned is_contiguous,
		const unsigned override_flags, uint64_t tx_op_flags)
{
#ifdef FI_BGQ_TRACE
        fprintf(stderr,"fi_bgq_send_generic_flags starting\n");
#endif
	assert(is_msg == 0 || is_msg == 1);
	assert(is_contiguous == 0 || is_contiguous == 1);

	struct fi_bgq_ep *bgq_ep = container_of(ep, struct fi_bgq_ep, ep_fid);

	ssize_t ret;
	ret = fi_bgq_ep_tx_check(&bgq_ep->tx, FI_BGQ_FABRIC_DIRECT_AV);
	if (ret) return ret;

	/* TODO - if this is a FI_CLASS_STX_CTX, then the lock is required */
	ret = fi_bgq_lock_if_required(&bgq_ep->lock, lock_required);
	if (ret) return ret;

	/* get the destination bgq torus address */
	const union fi_bgq_addr bgq_dst_addr = {.fi=dest_addr};

	size_t xfer_len = 0;
	if (is_contiguous) xfer_len = len;
	else {
		size_t i;
		const struct iovec * iov = (const struct iovec *)buf;
		for (i=0; i<len; ++i) xfer_len += iov[i].iov_len;
	}

	if (!override_flags) tx_op_flags = bgq_ep->tx.op_flags;

	/* busy-wait until a fifo slot is available .. */
	MUHWI_Descriptor_t * send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo);

	if (xfer_len <= FI_BGQ_TOTAL_BUFFERED_RECV) {
		/* eager */

		/* copy the descriptor model into the injection fifo */
		qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.send_model);

		/* set the destination torus address and fifo map */
		send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi);
		send_desc->Torus_FIFO_Map = fi_bgq_addr_get_fifo_map(dest_addr);

		send_desc->Message_Length = xfer_len;

		send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id =
			fi_bgq_addr_rec_fifo_id(dest_addr);

		if (is_contiguous && ((tx_op_flags & FI_INJECT) == 0)) {
			fi_bgq_cnk_vaddr2paddr(buf, len, &send_desc->Pa_Payload);
		} else {
			/* locate the payload lookaside slot */
			uint64_t payload_paddr = 0;
			uintptr_t payload_vaddr =
				(uintptr_t) fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->tx.injfifo,
					send_desc, &payload_paddr);
			send_desc->Pa_Payload = payload_paddr;

			if (is_contiguous) {
				if (len) memcpy((void*)payload_vaddr, buf, len);
			} else {
				unsigned i;
				const struct iovec * iov = (const struct iovec *)buf;
				for (i=0; i<len; ++i) {
					memcpy((void*)payload_vaddr, iov[i].iov_base, iov[i].iov_len);
						payload_vaddr += iov[i].iov_len;
				}
			}
		}

		union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &send_desc->PacketHeader;
		hdr->pt2pt.send.message_length = xfer_len;
		hdr->pt2pt.ofi_tag = tag;
		hdr->pt2pt.immediate_data = data;

#ifdef FI_BGQ_TRACE
		fprintf(stderr,"eager sending to dest:\n");
		FI_BGQ_ADDR_DUMP(&dest_addr);
#endif
		if (is_msg) {
			fi_bgq_mu_packet_type_set(hdr, FI_BGQ_MU_PACKET_TYPE_EAGER);	/* clear the 'TAG' bit in the packet type */
		}

		MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo);

#ifdef FI_BGQ_REMOTE_COMPLETION
		if (tx_op_flags & (FI_TRANSMIT_COMPLETE | FI_DELIVERY_COMPLETE)) {

			/*
			 * TODO - this code is buggy and results in a hang at job completion for 'cpi'
			 *
			 * Suspect that remote processes are exiting before the 'request for ack'
			 * remote completion packet is received, then the process that issued the
			 * 'request for ack' messagee will hang because the ack is never received.
			 *
			 * Alternative implementations:
			 *   1. Do not support remote completions on bgq (current)
			 *   2. Support remote completions via rendezvous protocol
			 */

			/* inject the 'remote completion' descriptor */
			send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo);

			/* copy the descriptor model into the injection fifo */
			qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.remote_completion_model);

			/* initialize the completion entry */
			assert(context);
			assert(((uintptr_t)context & 0x07ull) == 0);	/* must be 8 byte aligned */
			union fi_bgq_context * bgq_context = (union fi_bgq_context *)context;
			bgq_context->flags = 0;		/* TODO */
			bgq_context->len = xfer_len;
			bgq_context->buf = NULL;	/* TODO */
			bgq_context->byte_counter = xfer_len;
			bgq_context->tag = tag;

			uint64_t byte_counter_paddr = 0;
			fi_bgq_cnk_vaddr2paddr((const void*)&bgq_context->byte_counter, sizeof(uint64_t), &byte_counter_paddr);

			/* set the destination torus address and fifo map */
			send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi);
			send_desc->Torus_FIFO_Map = (uint64_t) bgq_dst_addr.fifo_map;
			send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id =
				fi_bgq_addr_rec_fifo_id(dest_addr);

			hdr = (union fi_bgq_mu_packet_hdr *) &send_desc->PacketHeader;
			hdr->completion.is_local = fi_bgq_addr_is_local(dest_addr);
			hdr->completion.cntr_paddr_rsh3b = byte_counter_paddr >> 3;

			fi_bgq_cq_enqueue_pending(bgq_ep->send_cq, bgq_context, lock_required);

			MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo);

		} else
#endif
		{

			if (tx_op_flags & (FI_INJECT_COMPLETE | FI_TRANSMIT_COMPLETE | FI_DELIVERY_COMPLETE)) {

#ifdef FI_BGQ_TRACE
                fprintf(stderr,"eager injecting local completion dput\n");
#endif

				/* inject the 'local completion' direct put descriptor */
				send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo);

				/* copy the descriptor model into the injection fifo */
				qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.local_completion_model);

				/* initialize the completion entry */
				assert(context);
				assert(((uintptr_t)context & 0x07ull) == 0);	/* must be 8 byte aligned */
				union fi_bgq_context * bgq_context = (union fi_bgq_context *)context;
				bgq_context->flags = 0;		/* TODO */
				bgq_context->len = xfer_len;
				bgq_context->buf = NULL;	/* TODO */
				bgq_context->byte_counter = xfer_len;
				bgq_context->tag = tag;

				uint64_t byte_counter_paddr = 0;
				fi_bgq_cnk_vaddr2paddr((const void*)&bgq_context->byte_counter, sizeof(uint64_t), &byte_counter_paddr);

				send_desc->Pa_Payload =
					MUSPI_GetAtomicAddress(byte_counter_paddr,
						MUHWI_ATOMIC_OPCODE_LOAD_CLEAR);

				fi_bgq_cq_enqueue_pending(bgq_ep->send_cq, bgq_context, lock_required);

				MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo);
			}
		}

	} else {
		/* rendezvous */

#ifdef FI_BGQ_TRACE
                fprintf(stderr,"rendezvous sending to dest:\n");
                FI_BGQ_ADDR_DUMP(&dest_addr);
#endif

		assert((tx_op_flags & FI_INJECT) == 0);

		const uint64_t is_local = fi_bgq_addr_is_local(dest_addr);

		/* copy the descriptor model into the injection fifo */
		qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.rzv_model[is_local]);

		/* set the destination torus address and fifo map */
		send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi);
		send_desc->Torus_FIFO_Map = fi_bgq_addr_get_fifo_map(dest_addr);

		send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id =
			fi_bgq_addr_rec_fifo_id(dest_addr);

		union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &send_desc->PacketHeader;

		if (is_msg) {
			fi_bgq_mu_packet_type_set(hdr, FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS);
		}

		/* locate the payload lookaside slot */
		uint64_t payload_paddr = 0;
		union fi_bgq_mu_packet_payload *payload = 
			(union fi_bgq_mu_packet_payload *) fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->tx.injfifo,
				send_desc, &payload_paddr);
		send_desc->Pa_Payload = payload_paddr;

		payload->rendezvous.fifo_map = fi_bgq_addr_get_fifo_map(bgq_dst_addr.fi);

		if (is_contiguous) {
			/* only send one mu iov */
			fi_bgq_cnk_vaddr2paddr(buf, len, &payload->rendezvous.mu_iov[0].src_paddr);
			payload->rendezvous.mu_iov[0].message_length = len;
			hdr->pt2pt.rendezvous.niov_minus_1 = 0;
		} else {
			assert(len <= 31);
			size_t i;
			const struct iovec * iov = (const struct iovec *)buf;
			send_desc->Message_Length += (len-1) * sizeof(struct fi_bgq_mu_iov);
			for (i=0; i<len; ++i) {
				fi_bgq_cnk_vaddr2paddr(iov[i].iov_base, iov[i].iov_len, &payload->rendezvous.mu_iov[i].src_paddr);
				payload->rendezvous.mu_iov[i].message_length = iov[i].iov_len;
			}
			hdr->pt2pt.rendezvous.niov_minus_1 = len - 1;
		}

		/* initialize the completion entry */
		assert(context);
		assert(((uintptr_t)context & 0x07ull) == 0);	/* must be 8 byte aligned */
		union fi_bgq_context * bgq_context = (union fi_bgq_context *)context;
		bgq_context->flags = 0;		/* TODO */
		bgq_context->len = xfer_len;
		bgq_context->buf = NULL;	/* TODO */
		bgq_context->byte_counter = xfer_len;
		bgq_context->tag = tag;

		uint64_t byte_counter_paddr = 0;
		fi_bgq_cnk_vaddr2paddr((const void*)&bgq_context->byte_counter, sizeof(uint64_t), &byte_counter_paddr);
		payload->rendezvous.cntr_paddr_rsh3b = byte_counter_paddr >> 3;

		hdr->pt2pt.ofi_tag = tag;
		hdr->pt2pt.immediate_data = data;

		MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo);

		fi_bgq_cq_enqueue_pending(bgq_ep->send_cq, bgq_context, lock_required);
	}

	/* TODO - if this is a FI_CLASS_STX_CTX, then the lock is required */
	ret = fi_bgq_unlock_if_required(&bgq_ep->lock, lock_required);
	if (ret) return ret;

	return 0;
}