in prov/bgq/include/rdma/bgq/fi_bgq_rx.h [329:642]
void complete_receive_operation (struct fi_bgq_ep * bgq_ep,
struct fi_bgq_mu_packet * pkt,
const uint64_t origin_tag,
union fi_bgq_context * context,
const unsigned is_context_ext,
const unsigned is_multi_receive,
const unsigned is_manual_progress) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"complete_receive_operation starting\n");
#endif
const uint64_t recv_len = context->len;
void * recv_buf = context->buf;
const uint64_t packet_type = fi_bgq_mu_packet_type_get(pkt);
const uint64_t immediate_data = pkt->hdr.pt2pt.immediate_data;
if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"complete_receive_operation - packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER\n");
#endif
const uint64_t send_len = pkt->hdr.pt2pt.send.message_length;
if (is_multi_receive) { /* branch should compile out */
if (send_len) memcpy(recv_buf, (void*)&pkt->payload.byte[0], send_len);
union fi_bgq_context * original_multi_recv_context = context;
context = (union fi_bgq_context *)((uintptr_t)recv_buf - sizeof(union fi_bgq_context));
assert((((uintptr_t)context) & 0x07) == 0);
context->flags = FI_RECV | FI_MSG | FI_BGQ_CQ_CONTEXT_MULTIRECV;
context->buf = recv_buf;
context->len = send_len;
context->data = immediate_data;
context->tag = 0; /* tag is not valid for multi-receives */
context->multi_recv_context = original_multi_recv_context;
context->byte_counter = 0;
/* the next 'fi_bgq_context' must be 8-byte aligned */
uint64_t bytes_consumed = ((send_len + 8) & (~0x07ull)) + sizeof(union fi_bgq_context);
original_multi_recv_context->len -= bytes_consumed;
original_multi_recv_context->buf = (void*)((uintptr_t)(original_multi_recv_context->buf) + bytes_consumed);
#ifdef FI_BGQ_TRACE
fprintf(stderr,"complete_receive_operation - is_multi_receive - enqueue cq for child context %p of parent context %p\n",context,original_multi_recv_context);
#endif
/* post a completion event for the individual receive */
fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
} else if (send_len <= recv_len) {
if (send_len) memcpy(recv_buf, (void*)&pkt->payload.byte[0], send_len);
#ifdef FI_BGQ_TRACE
fprintf(stderr,"EAGER complete_receive_operation send_len %lu <= recv_len %lu calling fi_bgq_cq_enqueue_completed\n",send_len,recv_len);
#endif
context->buf = NULL;
context->len = send_len;
context->data = immediate_data;
context->tag = origin_tag;
context->byte_counter = 0;
/* post a completion event for the individual receive */
fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
} else { /* truncation - unlikely */
#ifdef FI_BGQ_TRACE
fprintf(stderr,"EAGER complete_receive_operation truncation - send_len %lu > recv_len %lu posting error\n",send_len,recv_len);
#endif
struct fi_bgq_context_ext * ext;
if (is_context_ext) {
ext = (struct fi_bgq_context_ext *)context;
ext->err_entry.op_context = ext->msg.op_context;
} else {
posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
ext->err_entry.op_context = context;
}
ext->err_entry.flags = context->flags;
ext->err_entry.len = recv_len;
ext->err_entry.buf = recv_buf;
ext->err_entry.data = immediate_data;
ext->err_entry.tag = origin_tag;
ext->err_entry.olen = send_len - recv_len;
ext->err_entry.err = FI_ETRUNC;
ext->err_entry.prov_errno = 0;
ext->err_entry.err_data = NULL;
ext->bgq_context.byte_counter = 0;
fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
}
return;
} else { /* rendezvous packet */
uint64_t niov = pkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
assert(niov <= (7-is_multi_receive));
uint64_t xfer_len = pkt->payload.rendezvous.mu_iov[0].message_length;
{
uint64_t i;
for (i=1; i<niov; ++i) xfer_len += pkt->payload.rendezvous.mu_iov[i].message_length;
}
uint64_t byte_counter_vaddr = 0;
if (is_multi_receive) { /* branch should compile out */
/* This code functionaliy is unverified - exit with an error mesg for now
* when we have an mpich case for this we will then verify.
*/
fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
fflush(stderr);
exit(1);
#ifdef FI_BGQ_TRACE
fprintf(stderr,"rendezvous multirecv\n");
#endif
union fi_bgq_context * multi_recv_context =
(union fi_bgq_context *)((uintptr_t)recv_buf - sizeof(union fi_bgq_context));
assert((((uintptr_t)multi_recv_context) & 0x07) == 0);
multi_recv_context->flags = FI_RECV | FI_MSG | FI_BGQ_CQ_CONTEXT_MULTIRECV;
multi_recv_context->buf = recv_buf;
multi_recv_context->len = xfer_len;
multi_recv_context->data = immediate_data;
multi_recv_context->tag = 0; /* tag is not valid for multi-receives */
multi_recv_context->multi_recv_context = context;
multi_recv_context->byte_counter = xfer_len;
/* the next 'fi_bgq_context' must be 8-byte aligned */
uint64_t bytes_consumed = ((xfer_len + 8) & (~0x07ull)) + sizeof(union fi_bgq_context);
context->len -= bytes_consumed;
context->buf = (void*)((uintptr_t)(context->buf) + bytes_consumed);
byte_counter_vaddr = (uint64_t)&multi_recv_context->byte_counter;
/* the original multi-receive context actually uses an
* operation counter - not a byte counter - but nevertheless
* the same field in the context structure is used */
context->byte_counter += 1;
/* post a completion event for the individual receive */
fi_bgq_cq_enqueue_pending(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
} else if (xfer_len <= recv_len) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"rendezvous complete_receive_operation xfer_len %lu <= recv_len %lu calling fi_bgq_cq_enqueue_pending\n",xfer_len,recv_len);
#endif
context->len = xfer_len;
context->data = immediate_data;
context->tag = origin_tag;
context->byte_counter = xfer_len;
byte_counter_vaddr = (uint64_t)&context->byte_counter;
/* post a completion event for the individual receive */
fi_bgq_cq_enqueue_pending(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
} else {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"rendezvous truncation xfer_len %lu > recv_len %lu posting error\n",xfer_len,recv_len);
#endif
/* truncation */
struct fi_bgq_context_ext * ext;
if (is_context_ext) {
ext = (struct fi_bgq_context_ext *)context;
ext->err_entry.op_context = ext->msg.op_context;
} else {
posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
ext->err_entry.op_context = context;
}
ext->err_entry.flags = context->flags;
ext->err_entry.len = recv_len;
ext->err_entry.buf = recv_buf;
ext->err_entry.data = immediate_data;
ext->err_entry.tag = origin_tag;
ext->err_entry.olen = xfer_len - recv_len;
ext->err_entry.err = FI_ETRUNC;
ext->err_entry.prov_errno = 0;
ext->err_entry.err_data = NULL;
ext->bgq_context.byte_counter = 0;
byte_counter_vaddr = (uint64_t)&ext->bgq_context.byte_counter;
fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
xfer_len = 0;
niov = 0;
}
/* determine the physical address of the byte counter memory */
uint64_t byte_counter_paddr = 0;
{
Kernel_MemoryRegion_t mr;
Kernel_CreateMemoryRegion(&mr, (void*)byte_counter_vaddr, sizeof(uint64_t));
byte_counter_paddr = (uint64_t)mr.BasePa + (byte_counter_vaddr - (uint64_t)mr.BaseVa);
}
/* determine the physical address of the destination buffer */
uint64_t dst_paddr = 0;
{
Kernel_MemoryRegion_t mr;
Kernel_CreateMemoryRegion(&mr, (void*)recv_buf, recv_len);
dst_paddr = (uint64_t)mr.BasePa + ((uint64_t)recv_buf - (uint64_t)mr.BaseVa);
}
const uint64_t fifo_map = fi_bgq_mu_packet_get_fifo_map(pkt);
const uint64_t is_local = (fifo_map & (MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL0 | MUHWI_DESCRIPTOR_TORUS_FIFO_MAP_LOCAL1)) != 0;
/*
* inject a "remote get" descriptor - the payload is composed
* of two descriptors:
*
* the first is a "direct put" descriptor that will rdma
* transfer the source data from the origin and will
* decrement a reception counter on the target as it
* completes
*
* the second is a "direct put" descriptor that will clear
* the byte counter for the send completion entry on the
* origin
*/
/* busy-wait until a fifo slot is available .. */
MUHWI_Descriptor_t * rget_desc =
fi_bgq_spi_injfifo_tail_wait(&bgq_ep->rx.poll.injfifo);
assert(rget_desc);
assert((((uintptr_t)rget_desc)&0x1F) == 0);
/* locate the payload lookaside slot */
uint64_t payload_paddr = 0;
MUHWI_Descriptor_t * payload =
(MUHWI_Descriptor_t *)fi_bgq_spi_injfifo_immediate_payload(&bgq_ep->rx.poll.injfifo,
rget_desc, &payload_paddr);
/* initialize the remote-get descriptor in the injection fifo */
qpx_memcpy64((void*)rget_desc, (const void*)&bgq_ep->rx.poll.rzv.rget_model[is_local]);
rget_desc->Pa_Payload = payload_paddr;
rget_desc->PacketHeader.messageUnitHeader.Packet_Types.Remote_Get.Rget_Inj_FIFO_Id =
pkt->hdr.pt2pt.rendezvous.rget_inj_fifo_id; /* TODO - different rget inj fifos for tag vs msg operations? */
rget_desc->PacketHeader.NetworkHeader.pt2pt.Destination = fi_bgq_uid_get_destination(pkt->hdr.pt2pt.uid.fi);
/* initialize the direct-put ("data transfer") descriptor(s) in the rget payload */
unsigned i;
for (i=0; i<niov; ++i) {
MUHWI_Descriptor_t * xfer_desc = payload++;
qpx_memcpy64((void*)xfer_desc, (const void*)&bgq_ep->rx.poll.rzv.dput_model[is_local]);
xfer_desc->Pa_Payload = pkt->payload.rendezvous.mu_iov[i].src_paddr;
const uint64_t message_length = pkt->payload.rendezvous.mu_iov[i].message_length;
xfer_desc->Message_Length = message_length;
MUSPI_SetRecPayloadBaseAddressInfo(xfer_desc, FI_BGQ_MU_BAT_ID_GLOBAL, dst_paddr);
dst_paddr += message_length;
xfer_desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Counter_Offset =
MUSPI_GetAtomicAddress(byte_counter_paddr, MUHWI_ATOMIC_OPCODE_STORE_ADD);
xfer_desc->PacketHeader.messageUnitHeader.Packet_Types.Direct_Put.Rec_Counter_Base_Address_Id =
FI_BGQ_MU_BAT_ID_GLOBAL;
rget_desc->Message_Length += sizeof(MUHWI_Descriptor_t);
if (is_multi_receive) { /* branch should compile out */
xfer_desc->Torus_FIFO_Map = fifo_map;
}
}
/* initialize the direct-put ("origin completion") descriptor in the rget payload */
{
MUHWI_Descriptor_t * dput_desc = payload;
qpx_memcpy64((void*)dput_desc, (const void*)&bgq_ep->rx.poll.rzv.dput_completion_model);
const uint64_t counter_paddr = ((uint64_t) pkt->payload.rendezvous.cntr_paddr_rsh3b) << 3;
dput_desc->Pa_Payload =
MUSPI_GetAtomicAddress(counter_paddr,
MUHWI_ATOMIC_OPCODE_LOAD_CLEAR);
}
/* initialize the memory-fifo ("rendezvous ack") descriptor in the rget payload for multi-receives */
if (is_multi_receive) { /* branch should compile out */
MUHWI_Descriptor_t * ack_desc = ++payload;
qpx_memcpy64((void*)ack_desc, (const void*)&bgq_ep->rx.poll.rzv.multi_recv_ack_model);
ack_desc->Torus_FIFO_Map = fifo_map;
rget_desc->Torus_FIFO_Map = fifo_map;
rget_desc->Message_Length += sizeof(MUHWI_Descriptor_t);
union fi_bgq_mu_packet_hdr * hdr = (union fi_bgq_mu_packet_hdr *) &ack_desc->PacketHeader;
hdr->ack.context = (uintptr_t) context;
}
/*
* inject the descriptor
*/
MUSPI_InjFifoAdvanceDesc(bgq_ep->rx.poll.injfifo.muspi_injfifo);
}
return;
}