static void perform_stream_op_locked()

in src/core/ext/transport/chttp2/transport/chttp2_transport.cc [1341:1633]


static void perform_stream_op_locked(void* stream_op,
                                     grpc_error* /*error_ignored*/) {
  GPR_TIMER_SCOPE("perform_stream_op_locked", 0);

  grpc_transport_stream_op_batch* op =
      static_cast<grpc_transport_stream_op_batch*>(stream_op);
  grpc_chttp2_stream* s =
      static_cast<grpc_chttp2_stream*>(op->handler_private.extra_arg);
  grpc_transport_stream_op_batch_payload* op_payload = op->payload;
  grpc_chttp2_transport* t = s->t;

  GRPC_STATS_INC_HTTP2_OP_BATCHES();

  s->context = op->payload->context;
  s->traced = op->is_traced;
  if (GRPC_TRACE_FLAG_ENABLED(grpc_http_trace)) {
    char* str = grpc_transport_stream_op_batch_string(op);
    gpr_log(GPR_INFO, "perform_stream_op_locked: %s; on_complete = %p", str,
            op->on_complete);
    gpr_free(str);
    if (op->send_initial_metadata) {
      log_metadata(op_payload->send_initial_metadata.send_initial_metadata,
                   s->id, t->is_client, true);
    }
    if (op->send_trailing_metadata) {
      log_metadata(op_payload->send_trailing_metadata.send_trailing_metadata,
                   s->id, t->is_client, false);
    }
  }

  grpc_closure* on_complete = op->on_complete;
  // on_complete will be null if and only if there are no send ops in the batch.
  if (on_complete != nullptr) {
    // This batch has send ops. Use final_data as a barrier until enqueue time;
    // the initial counter is dropped at the end of this function.
    on_complete->next_data.scratch = CLOSURE_BARRIER_FIRST_REF_BIT;
    on_complete->error_data.error = GRPC_ERROR_NONE;
  }

  if (op->cancel_stream) {
    GRPC_STATS_INC_HTTP2_OP_CANCEL();
    grpc_chttp2_cancel_stream(t, s, op_payload->cancel_stream.cancel_error);
  }

  if (op->send_initial_metadata) {
    if (t->is_client && t->channelz_socket != nullptr) {
      t->channelz_socket->RecordStreamStartedFromLocal();
    }
    GRPC_STATS_INC_HTTP2_OP_SEND_INITIAL_METADATA();
    GPR_ASSERT(s->send_initial_metadata_finished == nullptr);
    on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;

    /* Identify stream compression */
    if (op_payload->send_initial_metadata.send_initial_metadata->idx.named
                .content_encoding == nullptr ||
        grpc_stream_compression_method_parse(
            GRPC_MDVALUE(
                op_payload->send_initial_metadata.send_initial_metadata->idx
                    .named.content_encoding->md),
            true, &s->stream_compression_method) == 0) {
      s->stream_compression_method = GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS;
    }
    if (s->stream_compression_method !=
        GRPC_STREAM_COMPRESSION_IDENTITY_COMPRESS) {
      s->uncompressed_data_size = 0;
      s->stream_compression_ctx = nullptr;
      grpc_slice_buffer_init(&s->compressed_data_buffer);
    }
    s->send_initial_metadata_finished = add_closure_barrier(on_complete);
    s->send_initial_metadata =
        op_payload->send_initial_metadata.send_initial_metadata;
    const size_t metadata_size =
        grpc_metadata_batch_size(s->send_initial_metadata);
    const size_t metadata_peer_limit =
        t->settings[GRPC_PEER_SETTINGS]
                   [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
    if (t->is_client) {
      s->deadline = GPR_MIN(s->deadline, s->send_initial_metadata->deadline);
    }
    if (metadata_size > metadata_peer_limit) {
      grpc_chttp2_cancel_stream(
          t, s,
          grpc_error_set_int(
              grpc_error_set_int(
                  grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
                                         "to-be-sent initial metadata size "
                                         "exceeds peer limit"),
                                     GRPC_ERROR_INT_SIZE,
                                     static_cast<intptr_t>(metadata_size)),
                  GRPC_ERROR_INT_LIMIT,
                  static_cast<intptr_t>(metadata_peer_limit)),
              GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
    } else {
      if (contains_non_ok_status(s->send_initial_metadata)) {
        s->seen_error = true;
      }
      if (!s->write_closed) {
        if (t->is_client) {
          if (t->closed_with_error == GRPC_ERROR_NONE) {
            GPR_ASSERT(s->id == 0);
            grpc_chttp2_list_add_waiting_for_concurrency(t, s);
            maybe_start_some_streams(t);
          } else {
            grpc_chttp2_cancel_stream(
                t, s,
                grpc_error_set_int(
                    GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
                        "Transport closed", &t->closed_with_error, 1),
                    GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE));
          }
        } else {
          GPR_ASSERT(s->id != 0);
          grpc_chttp2_mark_stream_writable(t, s);
          if (!(op->send_message &&
                (op->payload->send_message.send_message->flags() &
                 GRPC_WRITE_BUFFER_HINT))) {
            grpc_chttp2_initiate_write(
                t, GRPC_CHTTP2_INITIATE_WRITE_SEND_INITIAL_METADATA);
          }
        }
      } else {
        s->send_initial_metadata = nullptr;
        grpc_chttp2_complete_closure_step(
            t, s, &s->send_initial_metadata_finished,
            GRPC_ERROR_CREATE_REFERENCING_FROM_STATIC_STRING(
                "Attempt to send initial metadata after stream was closed",
                &s->write_closed_error, 1),
            "send_initial_metadata_finished");
      }
    }
    if (op_payload->send_initial_metadata.peer_string != nullptr) {
      gpr_atm_rel_store(op_payload->send_initial_metadata.peer_string,
                        (gpr_atm)t->peer_string);
    }
  }

  if (op->send_message) {
    GRPC_STATS_INC_HTTP2_OP_SEND_MESSAGE();
    t->num_messages_in_next_write++;
    GRPC_STATS_INC_HTTP2_SEND_MESSAGE_SIZE(
        op->payload->send_message.send_message->length());
    on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
    s->fetching_send_message_finished = add_closure_barrier(op->on_complete);
    if (s->write_closed) {
      op->payload->send_message.stream_write_closed = true;
      // We should NOT return an error here, so as to avoid a cancel OP being
      // started. The surface layer will notice that the stream has been closed
      // for writes and fail the send message op.
      op->payload->send_message.send_message.reset();
      grpc_chttp2_complete_closure_step(
          t, s, &s->fetching_send_message_finished, GRPC_ERROR_NONE,
          "fetching_send_message_finished");
    } else {
      GPR_ASSERT(s->fetching_send_message == nullptr);
      uint8_t* frame_hdr = grpc_slice_buffer_tiny_add(
          &s->flow_controlled_buffer, GRPC_HEADER_SIZE_IN_BYTES);
      uint32_t flags = op_payload->send_message.send_message->flags();
      frame_hdr[0] = (flags & GRPC_WRITE_INTERNAL_COMPRESS) != 0;
      size_t len = op_payload->send_message.send_message->length();
      frame_hdr[1] = static_cast<uint8_t>(len >> 24);
      frame_hdr[2] = static_cast<uint8_t>(len >> 16);
      frame_hdr[3] = static_cast<uint8_t>(len >> 8);
      frame_hdr[4] = static_cast<uint8_t>(len);
      s->fetching_send_message =
          std::move(op_payload->send_message.send_message);
      s->fetched_send_message_length = 0;
      s->next_message_end_offset =
          s->flow_controlled_bytes_written +
          static_cast<int64_t>(s->flow_controlled_buffer.length) +
          static_cast<int64_t>(len);
      if (flags & GRPC_WRITE_BUFFER_HINT) {
        s->next_message_end_offset -= t->write_buffer_size;
        s->write_buffering = true;
      } else {
        s->write_buffering = false;
      }
      continue_fetching_send_locked(t, s);
      maybe_become_writable_due_to_send_msg(t, s);
    }
  }

  if (op->send_trailing_metadata) {
    GRPC_STATS_INC_HTTP2_OP_SEND_TRAILING_METADATA();
    GPR_ASSERT(s->send_trailing_metadata_finished == nullptr);
    on_complete->next_data.scratch |= CLOSURE_BARRIER_MAY_COVER_WRITE;
    s->send_trailing_metadata_finished = add_closure_barrier(on_complete);
    s->send_trailing_metadata =
        op_payload->send_trailing_metadata.send_trailing_metadata;
    s->write_buffering = false;
    const size_t metadata_size =
        grpc_metadata_batch_size(s->send_trailing_metadata);
    const size_t metadata_peer_limit =
        t->settings[GRPC_PEER_SETTINGS]
                   [GRPC_CHTTP2_SETTINGS_MAX_HEADER_LIST_SIZE];
    if (metadata_size > metadata_peer_limit) {
      grpc_chttp2_cancel_stream(
          t, s,
          grpc_error_set_int(
              grpc_error_set_int(
                  grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
                                         "to-be-sent trailing metadata size "
                                         "exceeds peer limit"),
                                     GRPC_ERROR_INT_SIZE,
                                     static_cast<intptr_t>(metadata_size)),
                  GRPC_ERROR_INT_LIMIT,
                  static_cast<intptr_t>(metadata_peer_limit)),
              GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_RESOURCE_EXHAUSTED));
    } else {
      if (contains_non_ok_status(s->send_trailing_metadata)) {
        s->seen_error = true;
      }
      if (s->write_closed) {
        s->send_trailing_metadata = nullptr;
        grpc_chttp2_complete_closure_step(
            t, s, &s->send_trailing_metadata_finished,
            grpc_metadata_batch_is_empty(
                op->payload->send_trailing_metadata.send_trailing_metadata)
                ? GRPC_ERROR_NONE
                : GRPC_ERROR_CREATE_FROM_STATIC_STRING(
                      "Attempt to send trailing metadata after "
                      "stream was closed"),
            "send_trailing_metadata_finished");
      } else if (s->id != 0) {
        /* TODO(ctiller): check if there's flow control for any outstanding
           bytes before going writable */
        grpc_chttp2_mark_stream_writable(t, s);
        grpc_chttp2_initiate_write(
            t, GRPC_CHTTP2_INITIATE_WRITE_SEND_TRAILING_METADATA);
      }
    }
  }

  if (op->recv_initial_metadata) {
    GRPC_STATS_INC_HTTP2_OP_RECV_INITIAL_METADATA();
    GPR_ASSERT(s->recv_initial_metadata_ready == nullptr);
    s->recv_initial_metadata_ready =
        op_payload->recv_initial_metadata.recv_initial_metadata_ready;
    s->recv_initial_metadata =
        op_payload->recv_initial_metadata.recv_initial_metadata;
    s->trailing_metadata_available =
        op_payload->recv_initial_metadata.trailing_metadata_available;
    if (op_payload->recv_initial_metadata.peer_string != nullptr) {
      gpr_atm_rel_store(op_payload->recv_initial_metadata.peer_string,
                        (gpr_atm)t->peer_string);
    }
    grpc_chttp2_maybe_complete_recv_initial_metadata(t, s);
  }

  if (op->recv_message) {
    GRPC_STATS_INC_HTTP2_OP_RECV_MESSAGE();
    size_t before = 0;
    GPR_ASSERT(s->recv_message_ready == nullptr);
    GPR_ASSERT(!s->pending_byte_stream);
    s->recv_message_ready = op_payload->recv_message.recv_message_ready;
    s->recv_message = op_payload->recv_message.recv_message;
    if (s->id != 0) {
      if (!s->read_closed) {
        before = s->frame_storage.length +
                 s->unprocessed_incoming_frames_buffer.length;
      }
    }
    grpc_chttp2_maybe_complete_recv_message(t, s);
    if (s->id != 0) {
      if (!s->read_closed && s->frame_storage.length == 0) {
        size_t after = s->frame_storage.length +
                       s->unprocessed_incoming_frames_buffer_cached_length;
        s->flow_control->IncomingByteStreamUpdate(GRPC_HEADER_SIZE_IN_BYTES,
                                                  before - after);
        grpc_chttp2_act_on_flowctl_action(s->flow_control->MakeAction(), t, s);
      }
    }
  }

  if (op->recv_trailing_metadata) {
    GRPC_STATS_INC_HTTP2_OP_RECV_TRAILING_METADATA();
    GPR_ASSERT(s->collecting_stats == nullptr);
    s->collecting_stats = op_payload->recv_trailing_metadata.collect_stats;
    GPR_ASSERT(s->recv_trailing_metadata_finished == nullptr);
    s->recv_trailing_metadata_finished =
        op_payload->recv_trailing_metadata.recv_trailing_metadata_ready;
    s->recv_trailing_metadata =
        op_payload->recv_trailing_metadata.recv_trailing_metadata;
    s->final_metadata_requested = true;
    grpc_chttp2_maybe_complete_recv_trailing_metadata(t, s);
  }

  if (on_complete != nullptr) {
    grpc_chttp2_complete_closure_step(t, s, &on_complete, GRPC_ERROR_NONE,
                                      "op->on_complete");
  }

  GRPC_CHTTP2_STREAM_UNREF(s, "perform_stream_op");
}