in src/core/ext/transport/inproc/inproc_transport.cc [914:1092]
void perform_stream_op(grpc_transport* gt, grpc_stream* gs,
grpc_transport_stream_op_batch* op) {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %p %p", gt, gs, op);
inproc_stream* s = reinterpret_cast<inproc_stream*>(gs);
gpr_mu* mu = &s->t->mu->mu; // save aside in case s gets closed
gpr_mu_lock(mu);
if (GRPC_TRACE_FLAG_ENABLED(grpc_inproc_trace)) {
if (op->send_initial_metadata) {
log_metadata(op->payload->send_initial_metadata.send_initial_metadata,
s->t->is_client, true);
}
if (op->send_trailing_metadata) {
log_metadata(op->payload->send_trailing_metadata.send_trailing_metadata,
s->t->is_client, false);
}
}
grpc_error* error = GRPC_ERROR_NONE;
grpc_closure* on_complete = op->on_complete;
// TODO(roth): This is a hack needed because we use data inside of the
// closure itself to do the barrier calculation (i.e., to ensure that
// we don't schedule the closure until all ops in the batch have been
// completed). This can go away once we move to a new C++ closure API
// that provides the ability to create a barrier closure.
if (on_complete == nullptr) {
on_complete = GRPC_CLOSURE_INIT(&op->handler_private.closure, do_nothing,
nullptr, grpc_schedule_on_exec_ctx);
}
if (op->cancel_stream) {
// Call cancel_stream_locked without ref'ing the cancel_error because
// this function is responsible to make sure that that field gets unref'ed
cancel_stream_locked(s, op->payload->cancel_stream.cancel_error);
// this op can complete without an error
} else if (s->cancel_self_error != GRPC_ERROR_NONE) {
// already self-canceled so still give it an error
error = GRPC_ERROR_REF(s->cancel_self_error);
} else {
INPROC_LOG(GPR_INFO, "perform_stream_op %p %s%s%s%s%s%s%s", s,
s->t->is_client ? "client" : "server",
op->send_initial_metadata ? " send_initial_metadata" : "",
op->send_message ? " send_message" : "",
op->send_trailing_metadata ? " send_trailing_metadata" : "",
op->recv_initial_metadata ? " recv_initial_metadata" : "",
op->recv_message ? " recv_message" : "",
op->recv_trailing_metadata ? " recv_trailing_metadata" : "");
}
inproc_stream* other = s->other_side;
if (error == GRPC_ERROR_NONE &&
(op->send_initial_metadata || op->send_trailing_metadata)) {
if (s->t->is_closed) {
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Endpoint already shutdown");
}
if (error == GRPC_ERROR_NONE && op->send_initial_metadata) {
grpc_metadata_batch* dest = (other == nullptr)
? &s->write_buffer_initial_md
: &other->to_read_initial_md;
uint32_t* destflags = (other == nullptr)
? &s->write_buffer_initial_md_flags
: &other->to_read_initial_md_flags;
bool* destfilled = (other == nullptr) ? &s->write_buffer_initial_md_filled
: &other->to_read_initial_md_filled;
if (*destfilled || s->initial_md_sent) {
// The buffer is already in use; that's an error!
INPROC_LOG(GPR_INFO, "Extra initial metadata %p", s);
error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra initial metadata");
} else {
if (!s->other_side_closed) {
fill_in_metadata(
s, op->payload->send_initial_metadata.send_initial_metadata,
op->payload->send_initial_metadata.send_initial_metadata_flags,
dest, destflags, destfilled);
}
if (s->t->is_client) {
grpc_millis* dl =
(other == nullptr) ? &s->write_buffer_deadline : &other->deadline;
*dl = GPR_MIN(*dl, op->payload->send_initial_metadata
.send_initial_metadata->deadline);
s->initial_md_sent = true;
}
}
maybe_process_ops_locked(other, error);
}
}
if (error == GRPC_ERROR_NONE &&
(op->send_message || op->send_trailing_metadata ||
op->recv_initial_metadata || op->recv_message ||
op->recv_trailing_metadata)) {
// Mark ops that need to be processed by the state machine
if (op->send_message) {
s->send_message_op = op;
}
if (op->send_trailing_metadata) {
s->send_trailing_md_op = op;
}
if (op->recv_initial_metadata) {
s->recv_initial_md_op = op;
}
if (op->recv_message) {
s->recv_message_op = op;
}
if (op->recv_trailing_metadata) {
s->recv_trailing_md_op = op;
}
// We want to initiate the state machine if:
// 1. We want to send a message and the other side wants to receive
// 2. We want to send trailing metadata and there isn't an unmatched send
// or the other side wants trailing metadata
// 3. We want initial metadata and the other side has sent it
// 4. We want to receive a message and there is a message ready
// 5. There is trailing metadata, even if nothing specifically wants
// that because that can shut down the receive message as well
if ((op->send_message && other && other->recv_message_op != nullptr) ||
(op->send_trailing_metadata &&
(!s->send_message_op || (other && other->recv_trailing_md_op))) ||
(op->recv_initial_metadata && s->to_read_initial_md_filled) ||
(op->recv_message && other && other->send_message_op != nullptr) ||
(s->to_read_trailing_md_filled || s->trailing_md_recvd)) {
op_state_machine_locked(s, error);
} else {
s->ops_needed = true;
}
} else {
if (error != GRPC_ERROR_NONE) {
// Consume any send message that was sent here but that we are not pushing
// to the other side
if (op->send_message) {
op->payload->send_message.send_message.reset();
}
// Schedule op's closures that we didn't push to op state machine
if (op->recv_initial_metadata) {
if (op->payload->recv_initial_metadata.trailing_metadata_available !=
nullptr) {
// Set to true unconditionally, because we're failing the call, so
// even if we haven't actually seen the send_trailing_metadata op
// from the other side, we're going to return trailing metadata
// anyway.
*op->payload->recv_initial_metadata.trailing_metadata_available =
true;
}
INPROC_LOG(
GPR_INFO,
"perform_stream_op error %p scheduling initial-metadata-ready %p",
s, error);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_initial_metadata.recv_initial_metadata_ready,
GRPC_ERROR_REF(error));
}
if (op->recv_message) {
INPROC_LOG(
GPR_INFO,
"perform_stream_op error %p scheduling recv message-ready %p", s,
error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
op->payload->recv_message.recv_message_ready,
GRPC_ERROR_REF(error));
}
if (op->recv_trailing_metadata) {
INPROC_LOG(
GPR_INFO,
"perform_stream_op error %p scheduling trailing-metadata-ready %p",
s, error);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
GRPC_ERROR_REF(error));
}
}
INPROC_LOG(GPR_INFO, "perform_stream_op %p scheduling on_complete %p", s,
error);
grpc_core::ExecCtx::Run(DEBUG_LOCATION, on_complete, GRPC_ERROR_REF(error));
}
gpr_mu_unlock(mu);
GRPC_ERROR_UNREF(error);
}