in prov/bgq/include/rdma/fi_direct_endpoint.h [522:781]
ssize_t fi_bgq_send_generic_flags(struct fid_ep *ep,
const void *buf, size_t len, void *desc,
fi_addr_t dest_addr, uint64_t tag, void *context,
const uint32_t data, int lock_required,
const unsigned is_msg, const unsigned is_contiguous,
const unsigned override_flags, uint64_t tx_op_flags)
{
#ifdef FI_BGQ_TRACE
fprintf(stderr,"fi_bgq_send_generic_flags starting\n");
#endif
assert(is_msg == 0 || is_msg == 1);
assert(is_contiguous == 0 || is_contiguous == 1);
struct fi_bgq_ep *bgq_ep = container_of(ep, struct fi_bgq_ep, ep_fid);
ssize_t ret;
ret = fi_bgq_ep_tx_check(&bgq_ep->tx, FI_BGQ_FABRIC_DIRECT_AV);
if (ret) return ret;
/* TODO - if this is a FI_CLASS_STX_CTX, then the lock is required */
ret = fi_bgq_lock_if_required(&bgq_ep->lock, lock_required);
if (ret) return ret;
/* get the destination bgq torus address */
const union fi_bgq_addr bgq_dst_addr = {.fi=dest_addr};
size_t xfer_len = 0;
if (is_contiguous) xfer_len = len;
else {
size_t i;
const struct iovec * iov = (const struct iovec *)buf;
for (i=0; i<len; ++i) xfer_len += iov[i].iov_len;
}
if (!override_flags) tx_op_flags = bgq_ep->tx.op_flags;
/* busy-wait until a fifo slot is available .. */
MUHWI_Descriptor_t * send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo);
if (xfer_len <= FI_BGQ_TOTAL_BUFFERED_RECV) {
/* eager */
/* copy the descriptor model into the injection fifo */
qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.send_model);
/* set the destination torus address and fifo map */
send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi);
send_desc->Torus_FIFO_Map = fi_bgq_addr_get_fifo_map(dest_addr);
send_desc->Message_Length = xfer_len;
send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id =
fi_bgq_addr_rec_fifo_id(dest_addr);
if (is_contiguous && ((tx_op_flags & FI_INJECT) == 0)) {
fi_bgq_cnk_vaddr2paddr(buf, len, &send_desc->Pa_Payload);
} else {
/* locate the payload lookaside slot */
uint64_t payload_paddr = 0;
uintptr_t payload_vaddr =
(uintptr_t) fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->tx.injfifo,
send_desc, &payload_paddr);
send_desc->Pa_Payload = payload_paddr;
if (is_contiguous) {
if (len) memcpy((void*)payload_vaddr, buf, len);
} else {
unsigned i;
const struct iovec * iov = (const struct iovec *)buf;
for (i=0; i<len; ++i) {
memcpy((void*)payload_vaddr, iov[i].iov_base, iov[i].iov_len);
payload_vaddr += iov[i].iov_len;
}
}
}
union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &send_desc->PacketHeader;
hdr->pt2pt.send.message_length = xfer_len;
hdr->pt2pt.ofi_tag = tag;
hdr->pt2pt.immediate_data = data;
#ifdef FI_BGQ_TRACE
fprintf(stderr,"eager sending to dest:\n");
FI_BGQ_ADDR_DUMP(&dest_addr);
#endif
if (is_msg) {
fi_bgq_mu_packet_type_set(hdr, FI_BGQ_MU_PACKET_TYPE_EAGER); /* clear the 'TAG' bit in the packet type */
}
MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo);
#ifdef FI_BGQ_REMOTE_COMPLETION
if (tx_op_flags & (FI_TRANSMIT_COMPLETE | FI_DELIVERY_COMPLETE)) {
/*
* TODO - this code is buggy and results in a hang at job completion for 'cpi'
*
* Suspect that remote processes are exiting before the 'request for ack'
* remote completion packet is received, then the process that issued the
* 'request for ack' messagee will hang because the ack is never received.
*
* Alternative implementations:
* 1. Do not support remote completions on bgq (current)
* 2. Support remote completions via rendezvous protocol
*/
/* inject the 'remote completion' descriptor */
send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo);
/* copy the descriptor model into the injection fifo */
qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.remote_completion_model);
/* initialize the completion entry */
assert(context);
assert(((uintptr_t)context & 0x07ull) == 0); /* must be 8 byte aligned */
union fi_bgq_context * bgq_context = (union fi_bgq_context *)context;
bgq_context->flags = 0; /* TODO */
bgq_context->len = xfer_len;
bgq_context->buf = NULL; /* TODO */
bgq_context->byte_counter = xfer_len;
bgq_context->tag = tag;
uint64_t byte_counter_paddr = 0;
fi_bgq_cnk_vaddr2paddr((const void*)&bgq_context->byte_counter, sizeof(uint64_t), &byte_counter_paddr);
/* set the destination torus address and fifo map */
send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi);
send_desc->Torus_FIFO_Map = (uint64_t) bgq_dst_addr.fifo_map;
send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id =
fi_bgq_addr_rec_fifo_id(dest_addr);
hdr = (union fi_bgq_mu_packet_hdr *) &send_desc->PacketHeader;
hdr->completion.is_local = fi_bgq_addr_is_local(dest_addr);
hdr->completion.cntr_paddr_rsh3b = byte_counter_paddr >> 3;
fi_bgq_cq_enqueue_pending(bgq_ep->send_cq, bgq_context, lock_required);
MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo);
} else
#endif
{
if (tx_op_flags & (FI_INJECT_COMPLETE | FI_TRANSMIT_COMPLETE | FI_DELIVERY_COMPLETE)) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"eager injecting local completion dput\n");
#endif
/* inject the 'local completion' direct put descriptor */
send_desc = fi_bgq_spi_injfifo_tail_wait(&bgq_ep->tx.injfifo);
/* copy the descriptor model into the injection fifo */
qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.local_completion_model);
/* initialize the completion entry */
assert(context);
assert(((uintptr_t)context & 0x07ull) == 0); /* must be 8 byte aligned */
union fi_bgq_context * bgq_context = (union fi_bgq_context *)context;
bgq_context->flags = 0; /* TODO */
bgq_context->len = xfer_len;
bgq_context->buf = NULL; /* TODO */
bgq_context->byte_counter = xfer_len;
bgq_context->tag = tag;
uint64_t byte_counter_paddr = 0;
fi_bgq_cnk_vaddr2paddr((const void*)&bgq_context->byte_counter, sizeof(uint64_t), &byte_counter_paddr);
send_desc->Pa_Payload =
MUSPI_GetAtomicAddress(byte_counter_paddr,
MUHWI_ATOMIC_OPCODE_LOAD_CLEAR);
fi_bgq_cq_enqueue_pending(bgq_ep->send_cq, bgq_context, lock_required);
MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo);
}
}
} else {
/* rendezvous */
#ifdef FI_BGQ_TRACE
fprintf(stderr,"rendezvous sending to dest:\n");
FI_BGQ_ADDR_DUMP(&dest_addr);
#endif
assert((tx_op_flags & FI_INJECT) == 0);
const uint64_t is_local = fi_bgq_addr_is_local(dest_addr);
/* copy the descriptor model into the injection fifo */
qpx_memcpy64((void*)send_desc, (const void *)&bgq_ep->tx.send.rzv_model[is_local]);
/* set the destination torus address and fifo map */
send_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(bgq_dst_addr.uid.fi);
send_desc->Torus_FIFO_Map = fi_bgq_addr_get_fifo_map(dest_addr);
send_desc->PacketHeader.messageUnitHeader.Packet_Types.Memory_FIFO.Rec_FIFO_Id =
fi_bgq_addr_rec_fifo_id(dest_addr);
union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &send_desc->PacketHeader;
if (is_msg) {
fi_bgq_mu_packet_type_set(hdr, FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS);
}
/* locate the payload lookaside slot */
uint64_t payload_paddr = 0;
union fi_bgq_mu_packet_payload *payload =
(union fi_bgq_mu_packet_payload *) fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->tx.injfifo,
send_desc, &payload_paddr);
send_desc->Pa_Payload = payload_paddr;
payload->rendezvous.fifo_map = fi_bgq_addr_get_fifo_map(bgq_dst_addr.fi);
if (is_contiguous) {
/* only send one mu iov */
fi_bgq_cnk_vaddr2paddr(buf, len, &payload->rendezvous.mu_iov[0].src_paddr);
payload->rendezvous.mu_iov[0].message_length = len;
hdr->pt2pt.rendezvous.niov_minus_1 = 0;
} else {
assert(len <= 31);
size_t i;
const struct iovec * iov = (const struct iovec *)buf;
send_desc->Message_Length += (len-1) * sizeof(struct fi_bgq_mu_iov);
for (i=0; i<len; ++i) {
fi_bgq_cnk_vaddr2paddr(iov[i].iov_base, iov[i].iov_len, &payload->rendezvous.mu_iov[i].src_paddr);
payload->rendezvous.mu_iov[i].message_length = iov[i].iov_len;
}
hdr->pt2pt.rendezvous.niov_minus_1 = len - 1;
}
/* initialize the completion entry */
assert(context);
assert(((uintptr_t)context & 0x07ull) == 0); /* must be 8 byte aligned */
union fi_bgq_context * bgq_context = (union fi_bgq_context *)context;
bgq_context->flags = 0; /* TODO */
bgq_context->len = xfer_len;
bgq_context->buf = NULL; /* TODO */
bgq_context->byte_counter = xfer_len;
bgq_context->tag = tag;
uint64_t byte_counter_paddr = 0;
fi_bgq_cnk_vaddr2paddr((const void*)&bgq_context->byte_counter, sizeof(uint64_t), &byte_counter_paddr);
payload->rendezvous.cntr_paddr_rsh3b = byte_counter_paddr >> 3;
hdr->pt2pt.ofi_tag = tag;
hdr->pt2pt.immediate_data = data;
MUSPI_InjFifoAdvanceDesc(bgq_ep->tx.injfifo.muspi_injfifo);
fi_bgq_cq_enqueue_pending(bgq_ep->send_cq, bgq_context, lock_required);
}
/* TODO - if this is a FI_CLASS_STX_CTX, then the lock is required */
ret = fi_bgq_unlock_if_required(&bgq_ep->lock, lock_required);
if (ret) return ret;
return 0;
}