in prov/bgq/include/rdma/bgq/fi_bgq_rx.h [1006:1386]
int process_mfifo_context (struct fi_bgq_ep * bgq_ep, const unsigned poll_msg,
const uint64_t cancel_context, union fi_bgq_context * context,
const uint64_t rx_op_flags, const uint64_t is_context_ext,
const unsigned is_manual_progress) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"process_mfifo_context starting - context->tag is %d\n",context->tag);
if (rx_op_flags & FI_PEEK)
fprintf(stderr,"just peeking\n");
fflush(stderr);
#endif
if (cancel_context) { /* branch should compile out */
const uint64_t compare_context = is_context_ext ?
(uint64_t)(((struct fi_bgq_context_ext *)context)->msg.op_context) :
(uint64_t)context;
if (compare_context == cancel_context) {
struct fi_bgq_context_ext * ext;
if (is_context_ext) {
ext = (struct fi_bgq_context_ext *)context;
} else {
posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
ext->bgq_context.flags = FI_BGQ_CQ_CONTEXT_EXT;
}
ext->bgq_context.byte_counter = 0;
ext->err_entry.op_context = (void *)cancel_context;
ext->err_entry.flags = rx_op_flags;
ext->err_entry.len = 0;
ext->err_entry.buf = 0;
ext->err_entry.data = 0;
ext->err_entry.tag = context->tag;
ext->err_entry.olen = 0;
ext->err_entry.err = FI_ECANCELED;
ext->err_entry.prov_errno = 0;
ext->err_entry.err_data = NULL;
fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
return FI_ECANCELED;
}
}
if ((rx_op_flags & (FI_PEEK | FI_CLAIM | FI_MULTI_RECV)) == 0) { /* likely */
/* search the unexpected packet queue */
struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
struct fi_bgq_mu_packet * prev = NULL;
struct fi_bgq_mu_packet * uepkt = head;
unsigned found_match = 0;
while (uepkt != NULL) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"process_mfifo_context - searching unexpected queue\n");
fflush(stderr);
#endif
if (is_match(uepkt, context, poll_msg)) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"process_mfifo_context - found match on unexpected queue\n");
fflush(stderr);
#endif
/* branch will compile out */
if (poll_msg)
complete_receive_operation(bgq_ep, uepkt,
0, context, 0, 0, is_manual_progress);
else
complete_receive_operation(bgq_ep, uepkt,
uepkt->hdr.pt2pt.ofi_tag, context, 0, 0, is_manual_progress);
/* remove the uepkt from the ue queue */
if (head == tail) {
bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
} else if (prev == NULL) {
bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
} else if (tail == uepkt) {
bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
prev->next = NULL;
} else {
prev->next = uepkt->next;
}
/* ... and prepend the uehdr to the ue free list. */
uepkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt;
/* found a match; break from the loop */
uepkt = NULL;
found_match = 1;
} else {
/* a match was not found; advance to the next ue header */
prev = uepkt;
uepkt = uepkt->next;
}
}
if (!found_match) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"process_mfifo_context - nothing found on unexpected queue adding to match queue for poll_msg %u context->tag is %d context is %p mq addr is %p\n",poll_msg,context->tag,context,&(bgq_ep->rx.poll.rfifo[poll_msg].mq));
fflush(stderr);
#endif
/*
* no unexpected headers were matched; add this match
* information to the appropriate match queue
*/
union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;
context->next = NULL;
if (tail == NULL) {
bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context;
} else {
tail->next = context;
}
bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = context;
}
} else if (rx_op_flags & FI_PEEK) { /* unlikely */
/* search the unexpected packet queue */
struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
struct fi_bgq_mu_packet * prev = NULL;
struct fi_bgq_mu_packet * uepkt = head;
#ifdef FI_BGQ_TRACE
fprintf(stderr,"process_mfifo_context - rx_op_flags & FI_PEEK searching unexpected queue\n");
if (uepkt == NULL)
fprintf(stderr,"uepkt == NULL\n");
else
fprintf(stderr,"uepkt != NULL\n");
fflush(stderr);
#endif
unsigned found_match = 0;
while (uepkt != NULL) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"process_mfifo_context uepkt != NULL - rx_op_flags & FI_PEEK searching unexpected queue\n");
fflush(stderr);
#endif
if (is_match(uepkt, context, poll_msg)) {
const uint64_t packet_type = fi_bgq_mu_packet_type_get(uepkt);
if (packet_type & FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS) {
const uint64_t niov = uepkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
uint64_t len = 0;
unsigned i;
for (i=0; i<niov; ++i) len += uepkt->payload.rendezvous.mu_iov[i].message_length;
context->len = len;
} else { /* "eager" or "eager with completion" packet type */
context->len = uepkt->hdr.pt2pt.send.message_length;
}
context->tag = poll_msg ? 0 : uepkt->hdr.pt2pt.ofi_tag;
context->byte_counter = 0;
if (rx_op_flags & FI_CLAIM) { /* both FI_PEEK and FI_CLAIM were specified */
assert((rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT) == 0);
context->claim = uepkt;
/* remove the uepkt from the ue queue */
if (head == tail) {
bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
} else if (prev == NULL) {
bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
} else if (tail == uepkt) {
bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
prev->next = NULL;
} else {
prev->next = uepkt->next;
}
}
/* tranfer immediate data from pkt to context for matching FI_PEEK */
context->data = uepkt->hdr.pt2pt.immediate_data;
/* post a completion event for the receive */
fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
found_match = 1;
uepkt = NULL;
} else {
/* a match was not found; advance to the next ue header */
prev = uepkt;
uepkt = uepkt->next;
}
}
if (!found_match) {
#ifdef FI_BGQ_TRACE
fprintf(stderr,"didn't find a match for this FI_PEEK\n");
fflush(stderr);
#endif
/* did not find a match for this "peek" */
struct fi_bgq_context_ext * ext;
uint64_t mfifo_value;
if (is_context_ext) {
ext = (struct fi_bgq_context_ext *)context;
mfifo_value = (uint64_t)context >> 3;
} else {
posix_memalign((void**)&ext, 32, sizeof(struct fi_bgq_context_ext));
ext->bgq_context.flags = rx_op_flags | FI_BGQ_CQ_CONTEXT_EXT;
mfifo_value = (uint64_t)ext >> 3;
}
ext->err_entry.op_context = context;
ext->err_entry.flags = rx_op_flags;
ext->err_entry.len = 0;
ext->err_entry.buf = 0;
ext->err_entry.data = 0;
ext->err_entry.tag = 0;
ext->err_entry.olen = 0;
ext->err_entry.err = FI_ENOMSG;
ext->err_entry.prov_errno = 0;
ext->err_entry.err_data = NULL;
ext->bgq_context.byte_counter = 0;
#ifdef FI_BGQ_TRACE
fprintf(stderr,"process_mfifo_context - no match found on unexpected queue posting error\n");
fflush(stderr);
#endif
fi_bgq_cq_enqueue_err (bgq_ep->recv_cq, ext,0);
}
} else if (rx_op_flags & FI_CLAIM) { /* unlikely */
assert((rx_op_flags & FI_BGQ_CQ_CONTEXT_EXT) == 0);
#ifdef FI_BGQ_TRACE
fprintf(stderr,"process_mfifo_context - rx_op_flags & FI_CLAIM complete receive operation\n");
#endif
/* only FI_CLAIM was specified
*
* this occurs after a previous FI_PEEK + FI_CLAIM
* operation has removed an unexpected packet from
* the queue and saved a pointer to it in the context
*
* complete the receive for this "claimed" message ... */
struct fi_bgq_mu_packet * claimed_pkt = context->claim;
if (poll_msg)
complete_receive_operation(bgq_ep, claimed_pkt,
0, context, 0, 0, is_manual_progress);
else
complete_receive_operation(bgq_ep, claimed_pkt,
claimed_pkt->hdr.pt2pt.ofi_tag, context, 0, 0, is_manual_progress);
/* ... and prepend the uehdr to the ue free list. */
claimed_pkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
bgq_ep->rx.poll.rfifo[poll_msg].ue.free = claimed_pkt;
} else if (poll_msg && (rx_op_flags & FI_MULTI_RECV)) { /* unlikely - branch should compile out for tagged receives */
/* search the unexpected packet queue */
struct fi_bgq_mu_packet * head = bgq_ep->rx.poll.rfifo[poll_msg].ue.head;
struct fi_bgq_mu_packet * tail = bgq_ep->rx.poll.rfifo[poll_msg].ue.tail;
struct fi_bgq_mu_packet * prev = NULL;
struct fi_bgq_mu_packet * uepkt = head;
unsigned full_multirecv_buffer = 0;
while (uepkt != NULL) {
if (is_match(uepkt, context, poll_msg)) {
/* verify that there is enough space available in
* the multi-receive buffer for the incoming data */
const uint64_t recv_len = context->len;
const uint64_t packet_type = fi_bgq_mu_packet_type_get(uepkt);
uint64_t send_len = 0;
if (packet_type & FI_BGQ_MU_PACKET_TYPE_EAGER) {
send_len = uepkt->hdr.pt2pt.send.message_length;
} else if (packet_type & FI_BGQ_MU_PACKET_TYPE_RENDEZVOUS) {
/* This code functionaliy is unverified - exit with an error mesg for now
* when we have an mpich case for this we will then verify.
*/
fprintf(stderr,"BGQ Provider does not support FI_MULTI_RECV and RENDEZVOUS protocol\n");
fflush(stderr);
exit(1);
const uint64_t niov = uepkt->hdr.pt2pt.rendezvous.niov_minus_1 + 1;
send_len = uepkt->payload.rendezvous.mu_iov[0].message_length;
uint64_t i;
for (i=1; i<niov; ++i) send_len += uepkt->payload.rendezvous.mu_iov[i].message_length;
}
if (send_len > recv_len) {
/* There is not enough room for the next subcontext multirec.
* to preserver the ordering just break off here with whatever
* matches are in the buffer and hopefully the next multirecv
* has space.
*/
uepkt = NULL;
full_multirecv_buffer = 1;
context->byte_counter = 0;
fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0);
} else {
complete_receive_operation(bgq_ep, uepkt,
0, context, 0, 1, is_manual_progress);
/* remove the uepkt from the ue queue */
if (head == tail) {
bgq_ep->rx.poll.rfifo[poll_msg].ue.head = NULL;
bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = NULL;
} else if (prev == NULL) {
bgq_ep->rx.poll.rfifo[poll_msg].ue.head = uepkt->next;
} else if (tail == uepkt) {
bgq_ep->rx.poll.rfifo[poll_msg].ue.tail = prev;
prev->next = NULL;
} else {
prev->next = uepkt->next;
}
struct fi_bgq_mu_packet *matched_uepkt_next = uepkt->next;
/* ... and prepend the uehdr to the ue free list. */
uepkt->next = bgq_ep->rx.poll.rfifo[poll_msg].ue.free;
bgq_ep->rx.poll.rfifo[poll_msg].ue.free = uepkt;
if (context->len < bgq_ep->rx.poll.min_multi_recv) {
/* after processing this message there is not
* enough space available in the multi-receive
* buffer to receive the next message; break
* from the loop and post a 'FI_MULTI_RECV'
* event to the completion queue. */
uepkt = NULL;
full_multirecv_buffer = 1;
/* post a completion event for the multi-receive */
context->byte_counter = 0;
fi_bgq_cq_enqueue_completed(bgq_ep->recv_cq, context, 0); /* TODO - IS lock required? */
}
else {
uepkt = matched_uepkt_next;
}
}
} else {
/* a match was not found; advance to the next ue header */
prev = uepkt;
uepkt = uepkt->next;
}
}
if (!full_multirecv_buffer) {
/* The multirecv context has room in its buffer.
* Post to match queue for further filling.
*/
union fi_bgq_context * tail = bgq_ep->rx.poll.rfifo[poll_msg].mq.tail;
context->next = NULL;
if (tail == NULL) {
bgq_ep->rx.poll.rfifo[poll_msg].mq.head = context;
} else {
tail->next = context;
}
bgq_ep->rx.poll.rfifo[poll_msg].mq.tail = context;
}
}
return 0;
}