static int sock_pe_new_tx_entry()

in prov/sockets/src/sock_progress.c [2105:2276]


static int sock_pe_new_tx_entry(struct sock_pe *pe, struct sock_tx_ctx *tx_ctx)
{
	int i, datatype_sz;
	struct sock_msg_hdr *msg_hdr;
	struct sock_pe_entry *pe_entry;
	struct sock_ep_attr *ep_attr;

	pe_entry = sock_pe_acquire_entry(pe);
	memset(&pe_entry->pe.tx, 0, sizeof(pe_entry->pe.tx));
	memset(&pe_entry->msg_hdr, 0, sizeof(pe_entry->msg_hdr));

	pe_entry->type = SOCK_PE_TX;
	pe_entry->is_complete = 0;
	pe_entry->done_len = 0;
	pe_entry->conn = NULL;
	pe_entry->ep_attr = tx_ctx->ep_attr;
	pe_entry->pe.tx.tx_ctx = tx_ctx;
	pe_entry->completion_reported = 0;

	dlist_insert_tail(&pe_entry->ctx_entry, &tx_ctx->pe_entry_list);

	/* fill in PE tx entry */
	msg_hdr = &pe_entry->msg_hdr;
	msg_hdr->msg_len = sizeof(*msg_hdr);

	msg_hdr->pe_entry_id = PE_INDEX(pe, pe_entry);
	SOCK_LOG_DBG("New TX on PE entry %p (%d)\n",
		      pe_entry, msg_hdr->pe_entry_id);

	sock_tx_ctx_read_op_send(tx_ctx, &pe_entry->pe.tx.tx_op,
			&pe_entry->flags, &pe_entry->context, &pe_entry->addr,
			&pe_entry->buf, &ep_attr, &pe_entry->conn);

	if (pe_entry->pe.tx.tx_op.op == SOCK_OP_TSEND) {
		ofi_rbread(&tx_ctx->rb, &pe_entry->tag, sizeof(pe_entry->tag));
		msg_hdr->msg_len += sizeof(pe_entry->tag);
	}

	if (ep_attr && tx_ctx->fclass == FI_CLASS_STX_CTX)
		pe_entry->comp = &ep_attr->tx_ctx->comp;
	else
		pe_entry->comp = &tx_ctx->comp;

	if (pe_entry->flags & FI_REMOTE_CQ_DATA) {
		ofi_rbread(&tx_ctx->rb, &pe_entry->data, sizeof(pe_entry->data));
		msg_hdr->msg_len += sizeof(pe_entry->data);
	}

	msg_hdr->op_type = pe_entry->pe.tx.tx_op.op;
	switch (pe_entry->pe.tx.tx_op.op) {
	case SOCK_OP_SEND:
	case SOCK_OP_TSEND:
		if (pe_entry->flags & FI_INJECT) {
			ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0],
				 pe_entry->pe.tx.tx_op.src_iov_len);
			msg_hdr->msg_len += pe_entry->pe.tx.tx_op.src_iov_len;
		} else {
			for (i = 0; i < pe_entry->pe.tx.tx_op.src_iov_len; i++) {
				ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].src,
					 sizeof(pe_entry->pe.tx.tx_iov[i].src));
				msg_hdr->msg_len += pe_entry->pe.tx.tx_iov[i].src.iov.len;
			}
		}
		msg_hdr->dest_iov_len = pe_entry->pe.tx.tx_op.dest_iov_len;
		if (pe_entry->flags & SOCK_NO_COMPLETION)
			pe_entry->flags |= FI_INJECT_COMPLETE;
		break;
	case SOCK_OP_WRITE:
		if (pe_entry->flags & FI_INJECT) {
			ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0],
				 pe_entry->pe.tx.tx_op.src_iov_len);
			msg_hdr->msg_len += pe_entry->pe.tx.tx_op.src_iov_len;
		} else {
			for (i = 0; i < pe_entry->pe.tx.tx_op.src_iov_len; i++) {
				ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].src,
					 sizeof(pe_entry->pe.tx.tx_iov[i].src));
				msg_hdr->msg_len += pe_entry->pe.tx.tx_iov[i].src.iov.len;
			}
		}

		for (i = 0; i < pe_entry->pe.tx.tx_op.dest_iov_len; i++) {
			ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].dst,
				 sizeof(pe_entry->pe.tx.tx_iov[i].dst));
		}
		msg_hdr->msg_len += sizeof(union sock_iov) * i;
		msg_hdr->dest_iov_len = pe_entry->pe.tx.tx_op.dest_iov_len;
		break;
	case SOCK_OP_READ:
		for (i = 0; i < pe_entry->pe.tx.tx_op.src_iov_len; i++) {
			ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].src,
				 sizeof(pe_entry->pe.tx.tx_iov[i].src));
		}
		msg_hdr->msg_len += sizeof(union sock_iov) * i;

		for (i = 0;  i < pe_entry->pe.tx.tx_op.dest_iov_len; i++) {
			ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].dst,
				 sizeof(pe_entry->pe.tx.tx_iov[i].dst));
		}
		msg_hdr->dest_iov_len = pe_entry->pe.tx.tx_op.src_iov_len;
		break;
	case SOCK_OP_ATOMIC:
		msg_hdr->msg_len += sizeof(struct sock_op);
		datatype_sz = ofi_datatype_size(pe_entry->pe.tx.tx_op.atomic.datatype);
		if (pe_entry->flags & FI_INJECT) {
			ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0],
				 pe_entry->pe.tx.tx_op.src_iov_len);
			ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0] +
				pe_entry->pe.tx.tx_op.src_iov_len,
				pe_entry->pe.tx.tx_op.atomic.cmp_iov_len);
			msg_hdr->msg_len += pe_entry->pe.tx.tx_op.src_iov_len +
						pe_entry->pe.tx.tx_op.atomic.cmp_iov_len;
		} else {
			for (i = 0; i < pe_entry->pe.tx.tx_op.src_iov_len; i++) {
				ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].src,
					 sizeof(pe_entry->pe.tx.tx_iov[i].src));

				if (pe_entry->pe.tx.tx_op.atomic.op != FI_ATOMIC_READ)
					msg_hdr->msg_len += datatype_sz *
						pe_entry->pe.tx.tx_iov[i].src.ioc.count;
			}
			for (i = 0; i < pe_entry->pe.tx.tx_op.atomic.cmp_iov_len; i++) {
				ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].cmp,
				 	sizeof(pe_entry->pe.tx.tx_iov[i].cmp));
				msg_hdr->msg_len += datatype_sz *
					pe_entry->pe.tx.tx_iov[i].cmp.ioc.count;
			}
		}

		for (i = 0; i < pe_entry->pe.tx.tx_op.dest_iov_len; i++) {
			ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].dst,
				 sizeof(pe_entry->pe.tx.tx_iov[i].dst));
		}
		msg_hdr->msg_len += sizeof(union sock_iov) * i;

		for (i = 0; i < pe_entry->pe.tx.tx_op.atomic.res_iov_len; i++) {
			ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].res,
				 sizeof(pe_entry->pe.tx.tx_iov[i].res));
		}

		msg_hdr->dest_iov_len = pe_entry->pe.tx.tx_op.dest_iov_len;
		break;
	case SOCK_OP_CONN_MSG:
		ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0],
			pe_entry->pe.tx.tx_op.src_iov_len);
		msg_hdr->msg_len += pe_entry->pe.tx.tx_op.src_iov_len;
		break;
	default:
		SOCK_LOG_ERROR("Invalid operation type\n");
		return -FI_EINVAL;
	}
	SOCK_LOG_DBG("Inserting TX-entry to PE entry %p, conn: %p\n",
		      pe_entry, pe_entry->conn);

	/* prepare message header */
	msg_hdr->version = SOCK_WIRE_PROTO_VERSION;

	if (tx_ctx->av)
		msg_hdr->rx_id = (uint16_t) SOCK_GET_RX_ID(pe_entry->addr,
							   tx_ctx->av->rx_ctx_bits);
	else
		msg_hdr->rx_id = 0;

	if (pe_entry->flags & FI_INJECT_COMPLETE)
		pe_entry->flags &= ~FI_TRANSMIT_COMPLETE;

	msg_hdr->flags = htonll(pe_entry->flags);
	pe_entry->total_len = msg_hdr->msg_len;
	msg_hdr->msg_len = htonll(msg_hdr->msg_len);
	msg_hdr->pe_entry_id = htons(msg_hdr->pe_entry_id);

	return sock_pe_progress_tx_entry(pe, tx_ctx, pe_entry);
}