in prov/sockets/src/sock_progress.c [1318:1464]
static int sock_pe_process_rx_send(struct sock_pe *pe,
struct sock_rx_ctx *rx_ctx,
struct sock_pe_entry *pe_entry)
{
ssize_t i, ret = 0;
struct sock_rx_entry *rx_entry;
uint64_t len, rem, offset, data_len, done_data, used;
offset = 0;
len = sizeof(struct sock_msg_hdr);
if (pe_entry->msg_hdr.op_type == SOCK_OP_TSEND) {
if (sock_pe_recv_field(pe_entry, &pe_entry->tag,
SOCK_TAG_SIZE, len))
return 0;
len += SOCK_TAG_SIZE;
}
if (pe_entry->msg_hdr.flags & FI_REMOTE_CQ_DATA) {
if (sock_pe_recv_field(pe_entry, &pe_entry->data,
SOCK_CQ_DATA_SIZE, len))
return 0;
len += SOCK_CQ_DATA_SIZE;
}
data_len = pe_entry->msg_hdr.msg_len - len;
if (pe_entry->done_len == len && !pe_entry->pe.rx.rx_entry) {
fastlock_acquire(&rx_ctx->lock);
rx_ctx->progress_start = &rx_ctx->rx_buffered_list;
sock_pe_progress_buffered_rx(rx_ctx, false);
rx_entry = sock_rx_get_entry(rx_ctx, pe_entry->addr, pe_entry->tag,
pe_entry->msg_hdr.op_type == SOCK_OP_TSEND ? 1 : 0);
SOCK_LOG_DBG("Consuming posted entry: %p\n", rx_entry);
if (!rx_entry) {
SOCK_LOG_DBG("%p: No matching recv, buffering recv (len = %llu)\n",
pe_entry, (long long unsigned int)data_len);
rx_entry = sock_rx_new_buffered_entry(rx_ctx, data_len);
if (!rx_entry) {
fastlock_release(&rx_ctx->lock);
return -FI_ENOMEM;
}
rx_entry->addr = pe_entry->addr;
rx_entry->tag = pe_entry->tag;
rx_entry->data = pe_entry->data;
rx_entry->ignore = 0;
rx_entry->comp = pe_entry->comp;
if (pe_entry->msg_hdr.flags & FI_REMOTE_CQ_DATA)
rx_entry->flags |= FI_REMOTE_CQ_DATA;
if (pe_entry->msg_hdr.op_type == SOCK_OP_TSEND)
rx_entry->is_tagged = 1;
}
fastlock_release(&rx_ctx->lock);
pe_entry->context = rx_entry->context;
pe_entry->pe.rx.rx_entry = rx_entry;
}
rx_entry = pe_entry->pe.rx.rx_entry;
done_data = pe_entry->done_len - len;
pe_entry->data_len = data_len;
rem = pe_entry->data_len - done_data;
used = rx_entry->used;
for (i = 0; rem > 0 && i < rx_entry->rx_op.dest_iov_len; i++) {
/* skip used contents in rx_entry */
if (used >= rx_entry->iov[i].iov.len) {
used -= rx_entry->iov[i].iov.len;
continue;
}
offset = used;
data_len = MIN(rx_entry->iov[i].iov.len - used, rem);
ret = sock_comm_recv(pe_entry,
(char *) (uintptr_t) rx_entry->iov[i].iov.addr + offset,
data_len);
if (ret <= 0)
return ret;
if (!pe_entry->buf)
pe_entry->buf = rx_entry->iov[i].iov.addr + offset;
rem -= ret;
used = 0;
pe_entry->done_len += ret;
rx_entry->used += ret;
if (ret != data_len)
return 0;
}
pe_entry->is_complete = 1;
rx_entry->is_complete = 1;
pe_entry->flags = rx_entry->flags;
if (pe_entry->msg_hdr.op_type == SOCK_OP_TSEND)
pe_entry->flags |= FI_TAGGED;
pe_entry->flags |= (FI_MSG | FI_RECV);
if (pe_entry->msg_hdr.flags & FI_REMOTE_CQ_DATA)
pe_entry->flags |= FI_REMOTE_CQ_DATA;
pe_entry->flags &= ~FI_MULTI_RECV;
fastlock_acquire(&rx_ctx->lock);
if (rx_entry->flags & FI_MULTI_RECV) {
if (sock_rx_avail_len(rx_entry) < rx_ctx->min_multi_recv) {
pe_entry->flags |= FI_MULTI_RECV;
dlist_remove(&rx_entry->entry);
}
} else {
if (!rx_entry->is_buffered)
dlist_remove(&rx_entry->entry);
}
rx_entry->is_busy = 0;
fastlock_release(&rx_ctx->lock);
/* report error, if any */
if (rem) {
SOCK_LOG_ERROR("Not enough space in posted recv buffer\n");
sock_pe_report_rx_error(pe_entry, rem, FI_ETRUNC);
pe_entry->is_error = 1;
pe_entry->rem = pe_entry->total_len - pe_entry->done_len;
goto out;
} else {
if (!rx_entry->is_buffered)
sock_pe_report_recv_completion(pe_entry);
}
out:
if (pe_entry->msg_hdr.flags & FI_TRANSMIT_COMPLETE) {
sock_pe_send_response(pe, rx_ctx, pe_entry, 0,
SOCK_OP_SEND_COMPLETE, 0);
}
if (!rx_entry->is_buffered &&
(!(rx_entry->flags & FI_MULTI_RECV) ||
(pe_entry->flags & FI_MULTI_RECV))) {
fastlock_acquire(&rx_ctx->lock);
sock_rx_release_entry(rx_entry);
rx_ctx->num_left++;
fastlock_release(&rx_ctx->lock);
}
return ret;
}