in src/core/ext/transport/cronet/transport/cronet_transport.cc [838:1020]
static bool op_can_be_run(grpc_transport_stream_op_batch* curr_op,
struct stream_obj* s, struct op_state* op_state,
enum e_op_id op_id) {
struct op_state* stream_state = &s->state;
grpc_cronet_transport* t = s->curr_ct;
bool result = true;
/* When call is canceled, every op can be run, except under following
conditions
*/
bool is_canceled_or_failed = stream_state->state_op_done[OP_CANCEL_ERROR] ||
stream_state->state_callback_received[OP_FAILED];
if (is_canceled_or_failed) {
if (op_id == OP_SEND_INITIAL_METADATA) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_SEND_MESSAGE) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_SEND_TRAILING_METADATA) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_CANCEL_ERROR) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
/* already executed */
if (op_id == OP_RECV_INITIAL_METADATA &&
stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_RECV_MESSAGE && op_state->state_op_done[OP_RECV_MESSAGE]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
if (op_id == OP_RECV_TRAILING_METADATA &&
stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
/* ON_COMPLETE can be processed if one of the following conditions is met:
* 1. the stream failed
* 2. the stream is cancelled, and the callback is received
* 3. the stream succeeded before cancel is effective
* 4. the stream is cancelled, and the stream is never started */
if (op_id == OP_ON_COMPLETE &&
!(stream_state->state_callback_received[OP_FAILED] ||
stream_state->state_callback_received[OP_CANCELED] ||
stream_state->state_callback_received[OP_SUCCEEDED] ||
!stream_state->state_op_done[OP_SEND_INITIAL_METADATA])) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
} else if (op_id == OP_SEND_INITIAL_METADATA) {
/* already executed */
if (stream_state->state_op_done[OP_SEND_INITIAL_METADATA]) result = false;
} else if (op_id == OP_RECV_INITIAL_METADATA) {
/* already executed */
if (stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) result = false;
/* we haven't sent headers yet. */
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
result = false;
/* we haven't received headers yet. */
else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
!stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
result = false;
} else if (op_id == OP_SEND_MESSAGE) {
/* already executed (note we're checking op specific state, not stream
state) */
if (op_state->state_op_done[OP_SEND_MESSAGE]) result = false;
/* we haven't sent headers yet. */
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
result = false;
} else if (op_id == OP_RECV_MESSAGE) {
/* already executed */
if (op_state->state_op_done[OP_RECV_MESSAGE]) result = false;
/* we haven't received headers yet. */
else if (!stream_state->state_callback_received[OP_RECV_INITIAL_METADATA] &&
!stream_state->state_op_done[OP_RECV_TRAILING_METADATA])
result = false;
} else if (op_id == OP_RECV_TRAILING_METADATA) {
/* already executed */
if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) result = false;
/* we have asked for but haven't received message yet. */
else if (stream_state->state_op_done[OP_READ_REQ_MADE] &&
!stream_state->state_op_done[OP_RECV_MESSAGE])
result = false;
/* we haven't received trailers yet. */
else if (!stream_state->state_callback_received[OP_RECV_TRAILING_METADATA])
result = false;
/* we haven't received on_succeeded yet. */
else if (!stream_state->state_callback_received[OP_SUCCEEDED])
result = false;
} else if (op_id == OP_SEND_TRAILING_METADATA) {
/* already executed */
if (stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) result = false;
/* we haven't sent initial metadata yet */
else if (!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA])
result = false;
/* we haven't sent message yet */
else if (stream_state->pending_send_message &&
!stream_state->state_op_done[OP_SEND_MESSAGE])
result = false;
/* we haven't got on_write_completed for the send yet */
else if (stream_state->state_op_done[OP_SEND_MESSAGE] &&
!stream_state->state_callback_received[OP_SEND_MESSAGE] &&
!(t->use_packet_coalescing &&
stream_state->pending_write_for_trailer))
result = false;
} else if (op_id == OP_CANCEL_ERROR) {
/* already executed */
if (stream_state->state_op_done[OP_CANCEL_ERROR]) result = false;
} else if (op_id == OP_ON_COMPLETE) {
/* already executed (note we're checking op specific state, not stream
state) */
if (op_state->state_op_done[OP_ON_COMPLETE]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
/* Check if every op that was asked for is done. */
/* TODO(muxi): We should not consider the recv ops here, since they
* have their own callbacks. We should invoke a batch's on_complete
* as soon as all of the batch's send ops are complete, even if
* there are still recv ops pending. */
else if (curr_op->send_initial_metadata &&
!stream_state->state_callback_received[OP_SEND_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->send_message &&
!op_state->state_op_done[OP_SEND_MESSAGE]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->send_message &&
!stream_state->state_callback_received[OP_SEND_MESSAGE]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->send_trailing_metadata &&
!stream_state->state_op_done[OP_SEND_TRAILING_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->recv_initial_metadata &&
!stream_state->state_op_done[OP_RECV_INITIAL_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->recv_message &&
!op_state->state_op_done[OP_RECV_MESSAGE]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->cancel_stream &&
!stream_state->state_callback_received[OP_CANCELED]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
} else if (curr_op->recv_trailing_metadata) {
/* We aren't done with trailing metadata yet */
if (!stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
/* We've asked for actual message in an earlier op, and it hasn't been
delivered yet. */
else if (stream_state->state_op_done[OP_READ_REQ_MADE]) {
/* If this op is not the one asking for read, (which means some earlier
op has asked), and the read hasn't been delivered. */
if (!curr_op->recv_message &&
!stream_state->state_callback_received[OP_SUCCEEDED]) {
CRONET_LOG(GPR_DEBUG, "Because");
result = false;
}
}
}
/* We should see at least one on_write_completed for the trailers that we
sent */
else if (curr_op->send_trailing_metadata &&
!stream_state->state_callback_received[OP_SEND_MESSAGE])
result = false;
}
CRONET_LOG(GPR_DEBUG, "op_can_be_run %s : %s", op_id_string(op_id),
result ? "YES" : "NO");
return result;
}