in prov/sockets/src/sock_progress.c [2105:2276]
static int sock_pe_new_tx_entry(struct sock_pe *pe, struct sock_tx_ctx *tx_ctx)
{
int i, datatype_sz;
struct sock_msg_hdr *msg_hdr;
struct sock_pe_entry *pe_entry;
struct sock_ep_attr *ep_attr;
pe_entry = sock_pe_acquire_entry(pe);
memset(&pe_entry->pe.tx, 0, sizeof(pe_entry->pe.tx));
memset(&pe_entry->msg_hdr, 0, sizeof(pe_entry->msg_hdr));
pe_entry->type = SOCK_PE_TX;
pe_entry->is_complete = 0;
pe_entry->done_len = 0;
pe_entry->conn = NULL;
pe_entry->ep_attr = tx_ctx->ep_attr;
pe_entry->pe.tx.tx_ctx = tx_ctx;
pe_entry->completion_reported = 0;
dlist_insert_tail(&pe_entry->ctx_entry, &tx_ctx->pe_entry_list);
/* fill in PE tx entry */
msg_hdr = &pe_entry->msg_hdr;
msg_hdr->msg_len = sizeof(*msg_hdr);
msg_hdr->pe_entry_id = PE_INDEX(pe, pe_entry);
SOCK_LOG_DBG("New TX on PE entry %p (%d)\n",
pe_entry, msg_hdr->pe_entry_id);
sock_tx_ctx_read_op_send(tx_ctx, &pe_entry->pe.tx.tx_op,
&pe_entry->flags, &pe_entry->context, &pe_entry->addr,
&pe_entry->buf, &ep_attr, &pe_entry->conn);
if (pe_entry->pe.tx.tx_op.op == SOCK_OP_TSEND) {
ofi_rbread(&tx_ctx->rb, &pe_entry->tag, sizeof(pe_entry->tag));
msg_hdr->msg_len += sizeof(pe_entry->tag);
}
if (ep_attr && tx_ctx->fclass == FI_CLASS_STX_CTX)
pe_entry->comp = &ep_attr->tx_ctx->comp;
else
pe_entry->comp = &tx_ctx->comp;
if (pe_entry->flags & FI_REMOTE_CQ_DATA) {
ofi_rbread(&tx_ctx->rb, &pe_entry->data, sizeof(pe_entry->data));
msg_hdr->msg_len += sizeof(pe_entry->data);
}
msg_hdr->op_type = pe_entry->pe.tx.tx_op.op;
switch (pe_entry->pe.tx.tx_op.op) {
case SOCK_OP_SEND:
case SOCK_OP_TSEND:
if (pe_entry->flags & FI_INJECT) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0],
pe_entry->pe.tx.tx_op.src_iov_len);
msg_hdr->msg_len += pe_entry->pe.tx.tx_op.src_iov_len;
} else {
for (i = 0; i < pe_entry->pe.tx.tx_op.src_iov_len; i++) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].src,
sizeof(pe_entry->pe.tx.tx_iov[i].src));
msg_hdr->msg_len += pe_entry->pe.tx.tx_iov[i].src.iov.len;
}
}
msg_hdr->dest_iov_len = pe_entry->pe.tx.tx_op.dest_iov_len;
if (pe_entry->flags & SOCK_NO_COMPLETION)
pe_entry->flags |= FI_INJECT_COMPLETE;
break;
case SOCK_OP_WRITE:
if (pe_entry->flags & FI_INJECT) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0],
pe_entry->pe.tx.tx_op.src_iov_len);
msg_hdr->msg_len += pe_entry->pe.tx.tx_op.src_iov_len;
} else {
for (i = 0; i < pe_entry->pe.tx.tx_op.src_iov_len; i++) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].src,
sizeof(pe_entry->pe.tx.tx_iov[i].src));
msg_hdr->msg_len += pe_entry->pe.tx.tx_iov[i].src.iov.len;
}
}
for (i = 0; i < pe_entry->pe.tx.tx_op.dest_iov_len; i++) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].dst,
sizeof(pe_entry->pe.tx.tx_iov[i].dst));
}
msg_hdr->msg_len += sizeof(union sock_iov) * i;
msg_hdr->dest_iov_len = pe_entry->pe.tx.tx_op.dest_iov_len;
break;
case SOCK_OP_READ:
for (i = 0; i < pe_entry->pe.tx.tx_op.src_iov_len; i++) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].src,
sizeof(pe_entry->pe.tx.tx_iov[i].src));
}
msg_hdr->msg_len += sizeof(union sock_iov) * i;
for (i = 0; i < pe_entry->pe.tx.tx_op.dest_iov_len; i++) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].dst,
sizeof(pe_entry->pe.tx.tx_iov[i].dst));
}
msg_hdr->dest_iov_len = pe_entry->pe.tx.tx_op.src_iov_len;
break;
case SOCK_OP_ATOMIC:
msg_hdr->msg_len += sizeof(struct sock_op);
datatype_sz = ofi_datatype_size(pe_entry->pe.tx.tx_op.atomic.datatype);
if (pe_entry->flags & FI_INJECT) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0],
pe_entry->pe.tx.tx_op.src_iov_len);
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0] +
pe_entry->pe.tx.tx_op.src_iov_len,
pe_entry->pe.tx.tx_op.atomic.cmp_iov_len);
msg_hdr->msg_len += pe_entry->pe.tx.tx_op.src_iov_len +
pe_entry->pe.tx.tx_op.atomic.cmp_iov_len;
} else {
for (i = 0; i < pe_entry->pe.tx.tx_op.src_iov_len; i++) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].src,
sizeof(pe_entry->pe.tx.tx_iov[i].src));
if (pe_entry->pe.tx.tx_op.atomic.op != FI_ATOMIC_READ)
msg_hdr->msg_len += datatype_sz *
pe_entry->pe.tx.tx_iov[i].src.ioc.count;
}
for (i = 0; i < pe_entry->pe.tx.tx_op.atomic.cmp_iov_len; i++) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].cmp,
sizeof(pe_entry->pe.tx.tx_iov[i].cmp));
msg_hdr->msg_len += datatype_sz *
pe_entry->pe.tx.tx_iov[i].cmp.ioc.count;
}
}
for (i = 0; i < pe_entry->pe.tx.tx_op.dest_iov_len; i++) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].dst,
sizeof(pe_entry->pe.tx.tx_iov[i].dst));
}
msg_hdr->msg_len += sizeof(union sock_iov) * i;
for (i = 0; i < pe_entry->pe.tx.tx_op.atomic.res_iov_len; i++) {
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.tx_iov[i].res,
sizeof(pe_entry->pe.tx.tx_iov[i].res));
}
msg_hdr->dest_iov_len = pe_entry->pe.tx.tx_op.dest_iov_len;
break;
case SOCK_OP_CONN_MSG:
ofi_rbread(&tx_ctx->rb, &pe_entry->pe.tx.inject[0],
pe_entry->pe.tx.tx_op.src_iov_len);
msg_hdr->msg_len += pe_entry->pe.tx.tx_op.src_iov_len;
break;
default:
SOCK_LOG_ERROR("Invalid operation type\n");
return -FI_EINVAL;
}
SOCK_LOG_DBG("Inserting TX-entry to PE entry %p, conn: %p\n",
pe_entry, pe_entry->conn);
/* prepare message header */
msg_hdr->version = SOCK_WIRE_PROTO_VERSION;
if (tx_ctx->av)
msg_hdr->rx_id = (uint16_t) SOCK_GET_RX_ID(pe_entry->addr,
tx_ctx->av->rx_ctx_bits);
else
msg_hdr->rx_id = 0;
if (pe_entry->flags & FI_INJECT_COMPLETE)
pe_entry->flags &= ~FI_TRANSMIT_COMPLETE;
msg_hdr->flags = htonll(pe_entry->flags);
pe_entry->total_len = msg_hdr->msg_len;
msg_hdr->msg_len = htonll(msg_hdr->msg_len);
msg_hdr->pe_entry_id = htons(msg_hdr->pe_entry_id);
return sock_pe_progress_tx_entry(pe, tx_ctx, pe_entry);
}