in src/core/ext/transport/inproc/inproc_transport.cc [586:854]
void op_state_machine_locked(inproc_stream* s, grpc_error* error) {
// This function gets called when we have contents in the unprocessed reads
// Get what we want based on our ops wanted
// Schedule our appropriate closures
// and then return to ops_needed state if still needed
grpc_error* new_err = GRPC_ERROR_NONE;
bool needs_close = false;
INPROC_LOG(GPR_INFO, "op_state_machine %p", s);
// cancellation takes precedence
inproc_stream* other = s->other_side;
if (s->cancel_self_error != GRPC_ERROR_NONE) {
fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_self_error));
goto done;
} else if (s->cancel_other_error != GRPC_ERROR_NONE) {
fail_helper_locked(s, GRPC_ERROR_REF(s->cancel_other_error));
goto done;
} else if (error != GRPC_ERROR_NONE) {
fail_helper_locked(s, GRPC_ERROR_REF(error));
goto done;
}
if (s->send_message_op && other) {
if (other->recv_message_op) {
message_transfer_locked(s, other);
maybe_process_ops_locked(other, GRPC_ERROR_NONE);
} else if (!s->t->is_client && s->trailing_md_sent) {
// A server send will never be matched if the server already sent status
s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, GRPC_ERROR_NONE, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
s->send_message_op = nullptr;
}
}
// Pause a send trailing metadata if there is still an outstanding
// send message unless we know that the send message will never get
// matched to a receive. This happens on the client if the server has
// already sent status or on the server if the client has requested
// status
if (s->send_trailing_md_op &&
(!s->send_message_op ||
(s->t->is_client &&
(s->trailing_md_recvd || s->to_read_trailing_md_filled)) ||
(!s->t->is_client && other &&
(other->trailing_md_recvd || other->to_read_trailing_md_filled ||
other->recv_trailing_md_op)))) {
grpc_metadata_batch* dest = (other == nullptr)
? &s->write_buffer_trailing_md
: &other->to_read_trailing_md;
bool* destfilled = (other == nullptr) ? &s->write_buffer_trailing_md_filled
: &other->to_read_trailing_md_filled;
if (*destfilled || s->trailing_md_sent) {
// The buffer is already in use; that's an error!
INPROC_LOG(GPR_INFO, "Extra trailing metadata %p", s);
new_err = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Extra trailing metadata");
fail_helper_locked(s, GRPC_ERROR_REF(new_err));
goto done;
} else {
if (!other || !other->closed) {
fill_in_metadata(s,
s->send_trailing_md_op->payload->send_trailing_metadata
.send_trailing_metadata,
0, dest, nullptr, destfilled);
}
s->trailing_md_sent = true;
if (!s->t->is_client && s->trailing_md_recvd && s->recv_trailing_md_op) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-metadata-ready", s);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_NONE);
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete", s);
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
s->recv_trailing_md_op->on_complete,
GRPC_ERROR_NONE);
s->recv_trailing_md_op = nullptr;
needs_close = true;
}
}
maybe_process_ops_locked(other, GRPC_ERROR_NONE);
complete_if_batch_end_locked(
s, GRPC_ERROR_NONE, s->send_trailing_md_op,
"op_state_machine scheduling send-trailing-metadata-on-complete");
s->send_trailing_md_op = nullptr;
}
if (s->recv_initial_md_op) {
if (s->initial_md_recvd) {
new_err =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd initial md");
INPROC_LOG(
GPR_INFO,
"op_state_machine %p scheduling on_complete errors for already "
"recvd initial md %p",
s, new_err);
fail_helper_locked(s, GRPC_ERROR_REF(new_err));
goto done;
}
if (s->to_read_initial_md_filled) {
s->initial_md_recvd = true;
new_err = fill_in_metadata(
s, &s->to_read_initial_md, s->to_read_initial_md_flags,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata,
s->recv_initial_md_op->payload->recv_initial_metadata.recv_flags,
nullptr);
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata->deadline = s->deadline;
if (s->recv_initial_md_op->payload->recv_initial_metadata
.trailing_metadata_available != nullptr) {
*s->recv_initial_md_op->payload->recv_initial_metadata
.trailing_metadata_available =
(other != nullptr && other->send_trailing_md_op != nullptr);
}
grpc_metadata_batch_clear(&s->to_read_initial_md);
s->to_read_initial_md_filled = false;
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling initial-metadata-ready %p", s,
new_err);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_initial_md_op->payload->recv_initial_metadata
.recv_initial_metadata_ready,
GRPC_ERROR_REF(new_err));
complete_if_batch_end_locked(
s, new_err, s->recv_initial_md_op,
"op_state_machine scheduling recv-initial-metadata-on-complete");
s->recv_initial_md_op = nullptr;
if (new_err != GRPC_ERROR_NONE) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling on_complete errors2 %p", s,
new_err);
fail_helper_locked(s, GRPC_ERROR_REF(new_err));
goto done;
}
}
}
if (s->recv_message_op) {
if (other && other->send_message_op) {
message_transfer_locked(other, s);
maybe_process_ops_locked(other, GRPC_ERROR_NONE);
}
}
if (s->to_read_trailing_md_filled) {
if (s->trailing_md_recvd) {
new_err =
GRPC_ERROR_CREATE_FROM_STATIC_STRING("Already recvd trailing md");
INPROC_LOG(
GPR_INFO,
"op_state_machine %p scheduling on_complete errors for already "
"recvd trailing md %p",
s, new_err);
fail_helper_locked(s, GRPC_ERROR_REF(new_err));
goto done;
}
if (s->recv_message_op != nullptr) {
// This message needs to be wrapped up because it will never be
// satisfied
*s->recv_message_op->payload->recv_message.recv_message = nullptr;
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
s, new_err, s->recv_message_op,
"op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = nullptr;
}
if ((s->trailing_md_sent || s->t->is_client) && s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
s->send_message_op = nullptr;
}
if (s->recv_trailing_md_op != nullptr) {
// We wanted trailing metadata and we got it
s->trailing_md_recvd = true;
new_err =
fill_in_metadata(s, &s->to_read_trailing_md, 0,
s->recv_trailing_md_op->payload
->recv_trailing_metadata.recv_trailing_metadata,
nullptr, nullptr);
grpc_metadata_batch_clear(&s->to_read_trailing_md);
s->to_read_trailing_md_filled = false;
// We should schedule the recv_trailing_md_op completion if
// 1. this stream is the client-side
// 2. this stream is the server-side AND has already sent its trailing md
// (If the server hasn't already sent its trailing md, it doesn't have
// a final status, so don't mark this op complete)
if (s->t->is_client || s->trailing_md_sent) {
INPROC_LOG(GPR_INFO,
"op_state_machine %p scheduling trailing-md-on-complete %p",
s, new_err);
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_trailing_md_op->payload->recv_trailing_metadata
.recv_trailing_metadata_ready,
GRPC_ERROR_REF(new_err));
grpc_core::ExecCtx::Run(DEBUG_LOCATION,
s->recv_trailing_md_op->on_complete,
GRPC_ERROR_REF(new_err));
s->recv_trailing_md_op = nullptr;
needs_close = s->trailing_md_sent;
} else {
INPROC_LOG(GPR_INFO,
"op_state_machine %p server needs to delay handling "
"trailing-md-on-complete %p",
s, new_err);
}
} else {
INPROC_LOG(
GPR_INFO,
"op_state_machine %p has trailing md but not yet waiting for it", s);
}
}
if (s->trailing_md_recvd && s->recv_message_op) {
// No further message will come on this stream, so finish off the
// recv_message_op
INPROC_LOG(GPR_INFO, "op_state_machine %p scheduling message-ready", s);
*s->recv_message_op->payload->recv_message.recv_message = nullptr;
grpc_core::ExecCtx::Run(
DEBUG_LOCATION,
s->recv_message_op->payload->recv_message.recv_message_ready,
GRPC_ERROR_NONE);
complete_if_batch_end_locked(
s, new_err, s->recv_message_op,
"op_state_machine scheduling recv-message-on-complete");
s->recv_message_op = nullptr;
}
if (s->trailing_md_recvd && (s->trailing_md_sent || s->t->is_client) &&
s->send_message_op) {
// Nothing further will try to receive from this stream, so finish off
// any outstanding send_message op
s->send_message_op->payload->send_message.send_message.reset();
complete_if_batch_end_locked(
s, new_err, s->send_message_op,
"op_state_machine scheduling send-message-on-complete");
s->send_message_op = nullptr;
}
if (s->send_message_op || s->send_trailing_md_op || s->recv_initial_md_op ||
s->recv_message_op || s->recv_trailing_md_op) {
// Didn't get the item we wanted so we still need to get
// rescheduled
INPROC_LOG(
GPR_INFO, "op_state_machine %p still needs closure %p %p %p %p %p", s,
s->send_message_op, s->send_trailing_md_op, s->recv_initial_md_op,
s->recv_message_op, s->recv_trailing_md_op);
s->ops_needed = true;
}
done:
if (needs_close) {
close_other_side_locked(s, "op_state_machine");
close_stream_locked(s);
}
GRPC_ERROR_UNREF(new_err);
}