static int sock_pe_process_rx_send()

in prov/sockets/src/sock_progress.c [1318:1464]


static int sock_pe_process_rx_send(struct sock_pe *pe,
				struct sock_rx_ctx *rx_ctx,
				struct sock_pe_entry *pe_entry)
{
	ssize_t i, ret = 0;
	struct sock_rx_entry *rx_entry;
	uint64_t len, rem, offset, data_len, done_data, used;

	offset = 0;
	len = sizeof(struct sock_msg_hdr);

	if (pe_entry->msg_hdr.op_type == SOCK_OP_TSEND) {
		if (sock_pe_recv_field(pe_entry, &pe_entry->tag,
				       SOCK_TAG_SIZE, len))
			return 0;
		len += SOCK_TAG_SIZE;
	}

	if (pe_entry->msg_hdr.flags & FI_REMOTE_CQ_DATA) {
		if (sock_pe_recv_field(pe_entry, &pe_entry->data,
				       SOCK_CQ_DATA_SIZE, len))
			return 0;
		len += SOCK_CQ_DATA_SIZE;
	}

	data_len = pe_entry->msg_hdr.msg_len - len;
	if (pe_entry->done_len == len && !pe_entry->pe.rx.rx_entry) {
		fastlock_acquire(&rx_ctx->lock);
		rx_ctx->progress_start = &rx_ctx->rx_buffered_list;
		sock_pe_progress_buffered_rx(rx_ctx, false);

		rx_entry = sock_rx_get_entry(rx_ctx, pe_entry->addr, pe_entry->tag,
					     pe_entry->msg_hdr.op_type == SOCK_OP_TSEND ? 1 : 0);
		SOCK_LOG_DBG("Consuming posted entry: %p\n", rx_entry);

		if (!rx_entry) {
			SOCK_LOG_DBG("%p: No matching recv, buffering recv (len = %llu)\n",
				      pe_entry, (long long unsigned int)data_len);

			rx_entry = sock_rx_new_buffered_entry(rx_ctx, data_len);
			if (!rx_entry) {
				fastlock_release(&rx_ctx->lock);
				return -FI_ENOMEM;
			}

			rx_entry->addr = pe_entry->addr;
			rx_entry->tag = pe_entry->tag;
			rx_entry->data = pe_entry->data;
			rx_entry->ignore = 0;
			rx_entry->comp = pe_entry->comp;

			if (pe_entry->msg_hdr.flags & FI_REMOTE_CQ_DATA)
				rx_entry->flags |= FI_REMOTE_CQ_DATA;

			if (pe_entry->msg_hdr.op_type == SOCK_OP_TSEND)
				rx_entry->is_tagged = 1;
		}
		fastlock_release(&rx_ctx->lock);
		pe_entry->context = rx_entry->context;
		pe_entry->pe.rx.rx_entry = rx_entry;
	}

	rx_entry = pe_entry->pe.rx.rx_entry;
	done_data = pe_entry->done_len - len;
	pe_entry->data_len = data_len;
	rem = pe_entry->data_len - done_data;
	used = rx_entry->used;

	for (i = 0; rem > 0 && i < rx_entry->rx_op.dest_iov_len; i++) {

		/* skip used contents in rx_entry */
		if (used >= rx_entry->iov[i].iov.len) {
			used -= rx_entry->iov[i].iov.len;
			continue;
		}

		offset = used;
		data_len = MIN(rx_entry->iov[i].iov.len - used, rem);
		ret = sock_comm_recv(pe_entry,
				     (char *) (uintptr_t) rx_entry->iov[i].iov.addr + offset,
				     data_len);
		if (ret <= 0)
			return ret;

		if (!pe_entry->buf)
			pe_entry->buf = rx_entry->iov[i].iov.addr + offset;
		rem -= ret;
		used = 0;
		pe_entry->done_len += ret;
		rx_entry->used += ret;
		if (ret != data_len)
			return 0;
	}

	pe_entry->is_complete = 1;
	rx_entry->is_complete = 1;

	pe_entry->flags = rx_entry->flags;
	if (pe_entry->msg_hdr.op_type == SOCK_OP_TSEND)
		pe_entry->flags |= FI_TAGGED;
	pe_entry->flags |= (FI_MSG | FI_RECV);

	if (pe_entry->msg_hdr.flags & FI_REMOTE_CQ_DATA)
		pe_entry->flags |= FI_REMOTE_CQ_DATA;
	pe_entry->flags &= ~FI_MULTI_RECV;

	fastlock_acquire(&rx_ctx->lock);
	if (rx_entry->flags & FI_MULTI_RECV) {
		if (sock_rx_avail_len(rx_entry) < rx_ctx->min_multi_recv) {
			pe_entry->flags |= FI_MULTI_RECV;
			dlist_remove(&rx_entry->entry);
		}
	} else {
		if (!rx_entry->is_buffered)
			dlist_remove(&rx_entry->entry);
	}
	rx_entry->is_busy = 0;
	fastlock_release(&rx_ctx->lock);

	/* report error, if any */
	if (rem) {
		SOCK_LOG_ERROR("Not enough space in posted recv buffer\n");
		sock_pe_report_rx_error(pe_entry, rem, FI_ETRUNC);
		pe_entry->is_error = 1;
		pe_entry->rem = pe_entry->total_len - pe_entry->done_len;
		goto out;
	} else {
		if (!rx_entry->is_buffered)
			sock_pe_report_recv_completion(pe_entry);
	}

out:
	if (pe_entry->msg_hdr.flags & FI_TRANSMIT_COMPLETE) {
		sock_pe_send_response(pe, rx_ctx, pe_entry, 0,
				      SOCK_OP_SEND_COMPLETE, 0);
	}

	if (!rx_entry->is_buffered &&
	    (!(rx_entry->flags & FI_MULTI_RECV) ||
	     (pe_entry->flags & FI_MULTI_RECV))) {
		fastlock_acquire(&rx_ctx->lock);
		sock_rx_release_entry(rx_entry);
		rx_ctx->num_left++;
		fastlock_release(&rx_ctx->lock);
	}
	return ret;
}