in src/core/ext/filters/client_channel/client_channel.cc [3531:3655]
void CallData::AddSubchannelBatchesForPendingBatches(
grpc_call_element* elem, SubchannelCallRetryState* retry_state,
CallCombinerClosureList* closures) {
for (size_t i = 0; i < GPR_ARRAY_SIZE(pending_batches_); ++i) {
PendingBatch* pending = &pending_batches_[i];
grpc_transport_stream_op_batch* batch = pending->batch;
if (batch == nullptr) continue;
// Skip any batch that either (a) has already been started on this
// subchannel call or (b) we can't start yet because we're still
// replaying send ops that need to be completed first.
// TODO(roth): Note that if any one op in the batch can't be sent
// yet due to ops that we're replaying, we don't start any of the ops
// in the batch. This is probably okay, but it could conceivably
// lead to increased latency in some cases -- e.g., we could delay
// starting a recv op due to it being in the same batch with a send
// op. If/when we revamp the callback protocol in
// transport_stream_op_batch, we may be able to fix this.
if (batch->send_initial_metadata &&
retry_state->started_send_initial_metadata) {
continue;
}
if (batch->send_message && retry_state->completed_send_message_count <
retry_state->started_send_message_count) {
continue;
}
// Note that we only start send_trailing_metadata if we have no more
// send_message ops to start, since we can't send down any more
// send_message ops after send_trailing_metadata.
if (batch->send_trailing_metadata &&
(retry_state->started_send_message_count + batch->send_message <
send_messages_.size() ||
retry_state->started_send_trailing_metadata)) {
continue;
}
if (batch->recv_initial_metadata &&
retry_state->started_recv_initial_metadata) {
continue;
}
if (batch->recv_message && retry_state->completed_recv_message_count <
retry_state->started_recv_message_count) {
continue;
}
if (batch->recv_trailing_metadata &&
retry_state->started_recv_trailing_metadata) {
// If we previously completed a recv_trailing_metadata op
// initiated by StartInternalRecvTrailingMetadata(), use the
// result of that instead of trying to re-start this op.
if (GPR_UNLIKELY((retry_state->recv_trailing_metadata_internal_batch !=
nullptr))) {
// If the batch completed, then trigger the completion callback
// directly, so that we return the previously returned results to
// the application. Otherwise, just unref the internally
// started subchannel batch, since we'll propagate the
// completion when it completes.
if (retry_state->completed_recv_trailing_metadata) {
// Batches containing recv_trailing_metadata always succeed.
closures->Add(
&retry_state->recv_trailing_metadata_ready, GRPC_ERROR_NONE,
"re-executing recv_trailing_metadata_ready to propagate "
"internally triggered result");
} else {
retry_state->recv_trailing_metadata_internal_batch->Unref();
}
retry_state->recv_trailing_metadata_internal_batch = nullptr;
}
continue;
}
// If we're not retrying, just send the batch as-is.
if (method_params_ == nullptr ||
method_params_->retry_policy() == nullptr || retry_committed_) {
// TODO(roth) : We should probably call
// MaybeInjectRecvTrailingMetadataReadyForLoadBalancingPolicy here.
AddClosureForSubchannelBatch(elem, batch, closures);
PendingBatchClear(pending);
continue;
}
// Create batch with the right number of callbacks.
const bool has_send_ops = batch->send_initial_metadata ||
batch->send_message ||
batch->send_trailing_metadata;
const int num_callbacks = has_send_ops + batch->recv_initial_metadata +
batch->recv_message +
batch->recv_trailing_metadata;
SubchannelCallBatchData* batch_data = SubchannelCallBatchData::Create(
elem, num_callbacks, has_send_ops /* set_on_complete */);
// Cache send ops if needed.
MaybeCacheSendOpsForBatch(pending);
// send_initial_metadata.
if (batch->send_initial_metadata) {
AddRetriableSendInitialMetadataOp(retry_state, batch_data);
}
// send_message.
if (batch->send_message) {
AddRetriableSendMessageOp(elem, retry_state, batch_data);
}
// send_trailing_metadata.
if (batch->send_trailing_metadata) {
AddRetriableSendTrailingMetadataOp(retry_state, batch_data);
}
// recv_initial_metadata.
if (batch->recv_initial_metadata) {
// recv_flags is only used on the server side.
GPR_ASSERT(batch->payload->recv_initial_metadata.recv_flags == nullptr);
AddRetriableRecvInitialMetadataOp(retry_state, batch_data);
}
// recv_message.
if (batch->recv_message) {
AddRetriableRecvMessageOp(retry_state, batch_data);
}
// recv_trailing_metadata.
if (batch->recv_trailing_metadata) {
AddRetriableRecvTrailingMetadataOp(retry_state, batch_data);
}
AddClosureForSubchannelBatch(elem, &batch_data->batch, closures);
// Track number of pending subchannel send batches.
// If this is the first one, take a ref to the call stack.
if (batch->send_initial_metadata || batch->send_message ||
batch->send_trailing_metadata) {
if (num_pending_retriable_subchannel_send_batches_ == 0) {
GRPC_CALL_STACK_REF(owning_call_, "subchannel_send_batches");
}
++num_pending_retriable_subchannel_send_batches_;
}
}
}