in src/core/lib/surface/call.cc [1551:1958]
static grpc_call_error call_start_batch(grpc_call* call, const grpc_op* ops,
size_t nops, void* notify_tag,
int is_notify_tag_closure) {
GPR_TIMER_SCOPE("call_start_batch", 0);
size_t i;
const grpc_op* op;
batch_control* bctl;
bool has_send_ops = false;
int num_recv_ops = 0;
grpc_call_error error = GRPC_CALL_OK;
grpc_transport_stream_op_batch* stream_op;
grpc_transport_stream_op_batch_payload* stream_op_payload;
GRPC_CALL_LOG_BATCH(GPR_INFO, ops, nops);
if (nops == 0) {
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
grpc_cq_end_op(call->cq, notify_tag, GRPC_ERROR_NONE,
free_no_op_completion, nullptr,
static_cast<grpc_cq_completion*>(
gpr_malloc(sizeof(grpc_cq_completion))));
} else {
grpc_core::Closure::Run(DEBUG_LOCATION, (grpc_closure*)notify_tag,
GRPC_ERROR_NONE);
}
error = GRPC_CALL_OK;
goto done;
}
bctl = reuse_or_allocate_batch_control(call, ops);
if (bctl == nullptr) {
return GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
}
bctl->completion_data.notify_tag.tag = notify_tag;
bctl->completion_data.notify_tag.is_closure =
static_cast<uint8_t>(is_notify_tag_closure != 0);
stream_op = &bctl->op;
stream_op_payload = &call->stream_op_payload;
/* rewrite batch ops into a transport op */
for (i = 0; i < nops; i++) {
op = &ops[i];
if (op->reserved != nullptr) {
error = GRPC_CALL_ERROR;
goto done_with_error;
}
switch (op->op) {
case GRPC_OP_SEND_INITIAL_METADATA: {
/* Flag validation: currently allow no flags */
if (!are_initial_metadata_flags_valid(op->flags, call->is_client)) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
if (call->sent_initial_metadata) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
// TODO(juanlishen): If the user has already specified a compression
// algorithm by setting the initial metadata with key of
// GRPC_COMPRESSION_REQUEST_ALGORITHM_MD_KEY, we shouldn't override that
// with the compression algorithm mapped from compression level.
/* process compression level */
grpc_metadata& compression_md = call->compression_md;
compression_md.key = grpc_empty_slice();
compression_md.value = grpc_empty_slice();
compression_md.flags = 0;
size_t additional_metadata_count = 0;
grpc_compression_level effective_compression_level =
GRPC_COMPRESS_LEVEL_NONE;
bool level_set = false;
if (op->data.send_initial_metadata.maybe_compression_level.is_set) {
effective_compression_level =
op->data.send_initial_metadata.maybe_compression_level.level;
level_set = true;
} else {
const grpc_compression_options copts =
grpc_channel_compression_options(call->channel);
if (copts.default_level.is_set) {
level_set = true;
effective_compression_level = copts.default_level.level;
}
}
// Currently, only server side supports compression level setting.
if (level_set && !call->is_client) {
const grpc_compression_algorithm calgo =
compression_algorithm_for_level_locked(
call, effective_compression_level);
// The following metadata will be checked and removed by the message
// compression filter. It will be used as the call's compression
// algorithm.
compression_md.key = GRPC_MDSTR_GRPC_INTERNAL_ENCODING_REQUEST;
compression_md.value = grpc_compression_algorithm_slice(calgo);
additional_metadata_count++;
}
if (op->data.send_initial_metadata.count + additional_metadata_count >
INT_MAX) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
stream_op->send_initial_metadata = true;
call->sent_initial_metadata = true;
if (!prepare_application_metadata(
call, static_cast<int>(op->data.send_initial_metadata.count),
op->data.send_initial_metadata.metadata, 0, call->is_client,
&compression_md, static_cast<int>(additional_metadata_count))) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
/* TODO(ctiller): just make these the same variable? */
if (call->is_client) {
call->metadata_batch[0][0].deadline = call->send_deadline;
}
stream_op_payload->send_initial_metadata.send_initial_metadata =
&call->metadata_batch[0 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->send_initial_metadata.send_initial_metadata_flags =
op->flags;
if (call->is_client) {
stream_op_payload->send_initial_metadata.peer_string =
&call->peer_string;
}
has_send_ops = true;
break;
}
case GRPC_OP_SEND_MESSAGE: {
if (!are_write_flags_valid(op->flags)) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
if (op->data.send_message.send_message == nullptr) {
error = GRPC_CALL_ERROR_INVALID_MESSAGE;
goto done_with_error;
}
if (call->sending_message) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
uint32_t flags = op->flags;
/* If the outgoing buffer is already compressed, mark it as so in the
flags. These will be picked up by the compression filter and further
(wasteful) attempts at compression skipped. */
if (op->data.send_message.send_message->data.raw.compression >
GRPC_COMPRESS_NONE) {
flags |= GRPC_WRITE_INTERNAL_COMPRESS;
}
stream_op->send_message = true;
call->sending_message = true;
call->sending_stream.Init(
&op->data.send_message.send_message->data.raw.slice_buffer, flags);
stream_op_payload->send_message.send_message.reset(
call->sending_stream.get());
has_send_ops = true;
break;
}
case GRPC_OP_SEND_CLOSE_FROM_CLIENT: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
if (!call->is_client) {
error = GRPC_CALL_ERROR_NOT_ON_SERVER;
goto done_with_error;
}
if (call->sent_final_op) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
stream_op->send_trailing_metadata = true;
call->sent_final_op = true;
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
has_send_ops = true;
break;
}
case GRPC_OP_SEND_STATUS_FROM_SERVER: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
if (call->is_client) {
error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
goto done_with_error;
}
if (call->sent_final_op) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
if (op->data.send_status_from_server.trailing_metadata_count >
INT_MAX) {
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
stream_op->send_trailing_metadata = true;
call->sent_final_op = true;
GPR_ASSERT(call->send_extra_metadata_count == 0);
call->send_extra_metadata_count = 1;
call->send_extra_metadata[0].md = grpc_get_reffed_status_elem(
op->data.send_status_from_server.status);
grpc_error* status_error =
op->data.send_status_from_server.status == GRPC_STATUS_OK
? GRPC_ERROR_NONE
: grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_STATIC_STRING(
"Server returned error"),
GRPC_ERROR_INT_GRPC_STATUS,
static_cast<intptr_t>(
op->data.send_status_from_server.status));
if (op->data.send_status_from_server.status_details != nullptr) {
call->send_extra_metadata[1].md = grpc_mdelem_from_slices(
GRPC_MDSTR_GRPC_MESSAGE,
grpc_slice_ref_internal(
*op->data.send_status_from_server.status_details));
call->send_extra_metadata_count++;
if (status_error != GRPC_ERROR_NONE) {
char* msg = grpc_slice_to_c_string(
GRPC_MDVALUE(call->send_extra_metadata[1].md));
status_error =
grpc_error_set_str(status_error, GRPC_ERROR_STR_GRPC_MESSAGE,
grpc_slice_from_copied_string(msg));
gpr_free(msg);
}
}
gpr_atm_rel_store(&call->status_error,
reinterpret_cast<gpr_atm>(status_error));
if (!prepare_application_metadata(
call,
static_cast<int>(
op->data.send_status_from_server.trailing_metadata_count),
op->data.send_status_from_server.trailing_metadata, 1, 1,
nullptr, 0)) {
for (int n = 0; n < call->send_extra_metadata_count; n++) {
GRPC_MDELEM_UNREF(call->send_extra_metadata[n].md);
}
call->send_extra_metadata_count = 0;
error = GRPC_CALL_ERROR_INVALID_METADATA;
goto done_with_error;
}
stream_op_payload->send_trailing_metadata.send_trailing_metadata =
&call->metadata_batch[0 /* is_receiving */][1 /* is_trailing */];
has_send_ops = true;
break;
}
case GRPC_OP_RECV_INITIAL_METADATA: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
if (call->received_initial_metadata) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
call->received_initial_metadata = true;
call->buffered_metadata[0] =
op->data.recv_initial_metadata.recv_initial_metadata;
GRPC_CLOSURE_INIT(&call->receiving_initial_metadata_ready,
receiving_initial_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
stream_op->recv_initial_metadata = true;
stream_op_payload->recv_initial_metadata.recv_initial_metadata =
&call->metadata_batch[1 /* is_receiving */][0 /* is_trailing */];
stream_op_payload->recv_initial_metadata.recv_initial_metadata_ready =
&call->receiving_initial_metadata_ready;
if (!call->is_client) {
stream_op_payload->recv_initial_metadata.peer_string =
&call->peer_string;
}
++num_recv_ops;
break;
}
case GRPC_OP_RECV_MESSAGE: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
if (call->receiving_message) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
call->receiving_message = true;
stream_op->recv_message = true;
call->receiving_buffer = op->data.recv_message.recv_message;
stream_op_payload->recv_message.recv_message = &call->receiving_stream;
GRPC_CLOSURE_INIT(&call->receiving_stream_ready,
receiving_stream_ready_in_call_combiner, bctl,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_message.recv_message_ready =
&call->receiving_stream_ready;
++num_recv_ops;
break;
}
case GRPC_OP_RECV_STATUS_ON_CLIENT: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
if (!call->is_client) {
error = GRPC_CALL_ERROR_NOT_ON_SERVER;
goto done_with_error;
}
if (call->requested_final_op) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
call->requested_final_op = true;
call->buffered_metadata[1] =
op->data.recv_status_on_client.trailing_metadata;
call->final_op.client.status = op->data.recv_status_on_client.status;
call->final_op.client.status_details =
op->data.recv_status_on_client.status_details;
call->final_op.client.error_string =
op->data.recv_status_on_client.error_string;
stream_op->recv_trailing_metadata = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
receiving_trailing_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&call->receiving_trailing_metadata_ready;
++num_recv_ops;
break;
}
case GRPC_OP_RECV_CLOSE_ON_SERVER: {
/* Flag validation: currently allow no flags */
if (op->flags != 0) {
error = GRPC_CALL_ERROR_INVALID_FLAGS;
goto done_with_error;
}
if (call->is_client) {
error = GRPC_CALL_ERROR_NOT_ON_CLIENT;
goto done_with_error;
}
if (call->requested_final_op) {
error = GRPC_CALL_ERROR_TOO_MANY_OPERATIONS;
goto done_with_error;
}
call->requested_final_op = true;
call->final_op.server.cancelled =
op->data.recv_close_on_server.cancelled;
stream_op->recv_trailing_metadata = true;
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata =
&call->metadata_batch[1 /* is_receiving */][1 /* is_trailing */];
stream_op_payload->recv_trailing_metadata.collect_stats =
&call->final_info.stats.transport_stream_stats;
GRPC_CLOSURE_INIT(&call->receiving_trailing_metadata_ready,
receiving_trailing_metadata_ready, bctl,
grpc_schedule_on_exec_ctx);
stream_op_payload->recv_trailing_metadata.recv_trailing_metadata_ready =
&call->receiving_trailing_metadata_ready;
++num_recv_ops;
break;
}
}
}
GRPC_CALL_INTERNAL_REF(call, "completion");
if (!is_notify_tag_closure) {
GPR_ASSERT(grpc_cq_begin_op(call->cq, notify_tag));
}
bctl->set_num_steps_to_complete((has_send_ops ? 1 : 0) + num_recv_ops);
if (has_send_ops) {
GRPC_CLOSURE_INIT(&bctl->finish_batch, finish_batch, bctl,
grpc_schedule_on_exec_ctx);
stream_op->on_complete = &bctl->finish_batch;
}
gpr_atm_rel_store(&call->any_ops_sent_atm, 1);
execute_batch(call, stream_op, &bctl->start_batch);
done:
return error;
done_with_error:
/* reverse any mutations that occurred */
if (stream_op->send_initial_metadata) {
call->sent_initial_metadata = false;
grpc_metadata_batch_clear(&call->metadata_batch[0][0]);
}
if (stream_op->send_message) {
call->sending_message = false;
call->sending_stream->Orphan();
}
if (stream_op->send_trailing_metadata) {
call->sent_final_op = false;
grpc_metadata_batch_clear(&call->metadata_batch[0][1]);
}
if (stream_op->recv_initial_metadata) {
call->received_initial_metadata = false;
}
if (stream_op->recv_message) {
call->receiving_message = false;
}
if (stream_op->recv_trailing_metadata) {
call->requested_final_op = false;
}
goto done;
}