static enum e_op_result execute_stream_op()

in src/core/ext/transport/cronet/transport/cronet_transport.cc [1025:1401]


static enum e_op_result execute_stream_op(struct op_and_state* oas) {
  grpc_transport_stream_op_batch* stream_op = &oas->op;
  struct stream_obj* s = oas->s;
  grpc_cronet_transport* t = s->curr_ct;
  struct op_state* stream_state = &s->state;
  enum e_op_result result = NO_ACTION_POSSIBLE;
  if (stream_op->send_initial_metadata &&
      op_can_be_run(stream_op, s, &oas->state, OP_SEND_INITIAL_METADATA)) {
    CRONET_LOG(GPR_DEBUG, "running: %p OP_SEND_INITIAL_METADATA", oas);
    /* Start new cronet stream. It is destroyed in on_succeeded, on_canceled,
     * on_failed */
    GPR_ASSERT(s->cbs == nullptr);
    GPR_ASSERT(!stream_state->state_op_done[OP_SEND_INITIAL_METADATA]);
    s->cbs =
        bidirectional_stream_create(t->engine, s->curr_gs, &cronet_callbacks);
    CRONET_LOG(GPR_DEBUG, "%p = bidirectional_stream_create()", s->cbs);
    if (t->use_packet_coalescing) {
      bidirectional_stream_disable_auto_flush(s->cbs, true);
      bidirectional_stream_delay_request_headers_until_flush(s->cbs, true);
    }
    char* url = nullptr;
    const char* method = "POST";
    s->header_array.headers = nullptr;
    convert_metadata_to_cronet_headers(
        stream_op->payload->send_initial_metadata.send_initial_metadata,
        t->host, &url, &s->header_array.headers, &s->header_array.count,
        &method);
    s->header_array.capacity = s->header_array.count;
    CRONET_LOG(GPR_DEBUG, "bidirectional_stream_start(%p, %s)", s->cbs, url);
    bidirectional_stream_start(s->cbs, url, 0, method, &s->header_array, false);
    if (url) {
      gpr_free(url);
    }
    unsigned int header_index;
    for (header_index = 0; header_index < s->header_array.count;
         header_index++) {
      gpr_free((void*)s->header_array.headers[header_index].key);
      gpr_free((void*)s->header_array.headers[header_index].value);
    }
    stream_state->state_op_done[OP_SEND_INITIAL_METADATA] = true;
    if (t->use_packet_coalescing) {
      if (!stream_op->send_message && !stream_op->send_trailing_metadata) {
        s->state.flush_cronet_when_ready = true;
      }
    }
    result = ACTION_TAKEN_WITH_CALLBACK;
  } else if (stream_op->send_message &&
             op_can_be_run(stream_op, s, &oas->state, OP_SEND_MESSAGE)) {
    CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_MESSAGE", oas);
    stream_state->pending_send_message = false;
    if (stream_state->state_callback_received[OP_FAILED]) {
      result = NO_ACTION_POSSIBLE;
      CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
    } else {
      grpc_slice_buffer write_slice_buffer;
      grpc_slice slice;
      grpc_slice_buffer_init(&write_slice_buffer);
      if (1 != stream_op->payload->send_message.send_message->Next(
                   stream_op->payload->send_message.send_message->length(),
                   nullptr)) {
        /* Should never reach here */
        GPR_ASSERT(false);
      }
      if (GRPC_ERROR_NONE !=
          stream_op->payload->send_message.send_message->Pull(&slice)) {
        /* Should never reach here */
        GPR_ASSERT(false);
      }
      grpc_slice_buffer_add(&write_slice_buffer, slice);
      if (GPR_UNLIKELY(write_slice_buffer.count != 1)) {
        /* Empty request not handled yet */
        gpr_log(GPR_ERROR, "Empty request is not supported");
        GPR_ASSERT(write_slice_buffer.count == 1);
      }
      if (write_slice_buffer.count > 0) {
        size_t write_buffer_size;
        create_grpc_frame(
            &write_slice_buffer, &stream_state->ws.write_buffer,
            &write_buffer_size,
            stream_op->payload->send_message.send_message->flags());
        CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, %p)", s->cbs,
                   stream_state->ws.write_buffer);
        stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
        bidirectional_stream_write(s->cbs, stream_state->ws.write_buffer,
                                   static_cast<int>(write_buffer_size), false);
        grpc_slice_buffer_destroy_internal(&write_slice_buffer);
        if (t->use_packet_coalescing) {
          if (!stream_op->send_trailing_metadata) {
            CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
            bidirectional_stream_flush(s->cbs);
            result = ACTION_TAKEN_WITH_CALLBACK;
          } else {
            stream_state->pending_write_for_trailer = true;
            result = ACTION_TAKEN_NO_CALLBACK;
          }
        } else {
          result = ACTION_TAKEN_WITH_CALLBACK;
        }
      } else {
        result = NO_ACTION_POSSIBLE;
      }
    }
    stream_state->state_op_done[OP_SEND_MESSAGE] = true;
    oas->state.state_op_done[OP_SEND_MESSAGE] = true;
    stream_op->payload->send_message.send_message.reset();
  } else if (stream_op->send_trailing_metadata &&
             op_can_be_run(stream_op, s, &oas->state,
                           OP_SEND_TRAILING_METADATA)) {
    CRONET_LOG(GPR_DEBUG, "running: %p  OP_SEND_TRAILING_METADATA", oas);
    if (stream_state->state_callback_received[OP_FAILED]) {
      result = NO_ACTION_POSSIBLE;
      CRONET_LOG(GPR_DEBUG, "Stream is either cancelled or failed.");
    } else {
      CRONET_LOG(GPR_DEBUG, "bidirectional_stream_write (%p, 0)", s->cbs);
      stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
      bidirectional_stream_write(s->cbs, "", 0, true);
      if (t->use_packet_coalescing) {
        CRONET_LOG(GPR_DEBUG, "bidirectional_stream_flush (%p)", s->cbs);
        bidirectional_stream_flush(s->cbs);
      }
      result = ACTION_TAKEN_WITH_CALLBACK;
    }
    stream_state->state_op_done[OP_SEND_TRAILING_METADATA] = true;
  } else if (stream_op->recv_initial_metadata &&
             op_can_be_run(stream_op, s, &oas->state,
                           OP_RECV_INITIAL_METADATA)) {
    CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_INITIAL_METADATA", oas);
    if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION,
          stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
          GRPC_ERROR_NONE);
    } else if (stream_state->state_callback_received[OP_FAILED]) {
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION,
          stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
          GRPC_ERROR_NONE);
    } else if (stream_state->state_op_done[OP_RECV_TRAILING_METADATA]) {
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION,
          stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
          GRPC_ERROR_NONE);
    } else {
      grpc_chttp2_incoming_metadata_buffer_publish(
          &oas->s->state.rs.initial_metadata,
          stream_op->payload->recv_initial_metadata.recv_initial_metadata);
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION,
          stream_op->payload->recv_initial_metadata.recv_initial_metadata_ready,
          GRPC_ERROR_NONE);
    }
    stream_state->state_op_done[OP_RECV_INITIAL_METADATA] = true;
    result = ACTION_TAKEN_NO_CALLBACK;
  } else if (stream_op->recv_message &&
             op_can_be_run(stream_op, s, &oas->state, OP_RECV_MESSAGE)) {
    CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_MESSAGE", oas);
    if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
      CRONET_LOG(GPR_DEBUG, "Stream is cancelled.");
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
          GRPC_ERROR_NONE);
      stream_state->state_op_done[OP_RECV_MESSAGE] = true;
      oas->state.state_op_done[OP_RECV_MESSAGE] = true;
      result = ACTION_TAKEN_NO_CALLBACK;
    } else if (stream_state->state_callback_received[OP_FAILED]) {
      CRONET_LOG(GPR_DEBUG, "Stream failed.");
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
          GRPC_ERROR_NONE);
      stream_state->state_op_done[OP_RECV_MESSAGE] = true;
      oas->state.state_op_done[OP_RECV_MESSAGE] = true;
      result = ACTION_TAKEN_NO_CALLBACK;
    } else if (stream_state->rs.read_stream_closed == true) {
      /* No more data will be received */
      CRONET_LOG(GPR_DEBUG, "read stream closed");
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
          GRPC_ERROR_NONE);
      stream_state->state_op_done[OP_RECV_MESSAGE] = true;
      oas->state.state_op_done[OP_RECV_MESSAGE] = true;
      result = ACTION_TAKEN_NO_CALLBACK;
    } else if (stream_state->flush_read) {
      CRONET_LOG(GPR_DEBUG, "flush read");
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
          GRPC_ERROR_NONE);
      stream_state->state_op_done[OP_RECV_MESSAGE] = true;
      oas->state.state_op_done[OP_RECV_MESSAGE] = true;
      result = ACTION_TAKEN_NO_CALLBACK;
    } else if (stream_state->rs.length_field_received == false) {
      if (stream_state->rs.received_bytes == GRPC_HEADER_SIZE_IN_BYTES &&
          stream_state->rs.remaining_bytes == 0) {
        /* Start a read operation for data */
        stream_state->rs.length_field_received = true;
        parse_grpc_header(
            reinterpret_cast<const uint8_t*>(stream_state->rs.read_buffer),
            &stream_state->rs.length_field, &stream_state->rs.compressed);
        CRONET_LOG(GPR_DEBUG, "length field = %d",
                   stream_state->rs.length_field);
        if (stream_state->rs.length_field > 0) {
          stream_state->rs.read_buffer = static_cast<char*>(
              gpr_malloc(static_cast<size_t>(stream_state->rs.length_field)));
          GPR_ASSERT(stream_state->rs.read_buffer);
          stream_state->rs.remaining_bytes = stream_state->rs.length_field;
          stream_state->rs.received_bytes = 0;
          CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
          stream_state->state_op_done[OP_READ_REQ_MADE] =
              true; /* Indicates that at least one read request has been made */
          bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
                                    stream_state->rs.remaining_bytes);
          stream_state->pending_read_from_cronet = true;
          result = ACTION_TAKEN_WITH_CALLBACK;
        } else {
          stream_state->rs.remaining_bytes = 0;
          CRONET_LOG(GPR_DEBUG, "read operation complete. Empty response.");
          /* Clean up read_slice_buffer in case there is unread data. */
          grpc_slice_buffer_destroy_internal(
              &stream_state->rs.read_slice_buffer);
          grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
          uint32_t flags = 0;
          if (stream_state->rs.compressed) {
            flags |= GRPC_WRITE_INTERNAL_COMPRESS;
          }
          stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
          stream_op->payload->recv_message.recv_message->reset(
              stream_state->rs.sbs.get());
          grpc_core::ExecCtx::Run(
              DEBUG_LOCATION,
              stream_op->payload->recv_message.recv_message_ready,
              GRPC_ERROR_NONE);
          stream_state->state_op_done[OP_RECV_MESSAGE] = true;
          oas->state.state_op_done[OP_RECV_MESSAGE] = true;

          /* Extra read to trigger on_succeed */
          stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
          stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
          stream_state->rs.received_bytes = 0;
          stream_state->rs.compressed = false;
          stream_state->rs.length_field_received = false;
          CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
          stream_state->state_op_done[OP_READ_REQ_MADE] =
              true; /* Indicates that at least one read request has been made */
          bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
                                    stream_state->rs.remaining_bytes);
          stream_state->pending_read_from_cronet = true;
          result = ACTION_TAKEN_NO_CALLBACK;
        }
      } else if (stream_state->rs.remaining_bytes == 0) {
        /* Start a read operation for first 5 bytes (GRPC header) */
        stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
        stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
        stream_state->rs.received_bytes = 0;
        stream_state->rs.compressed = false;
        CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
        stream_state->state_op_done[OP_READ_REQ_MADE] =
            true; /* Indicates that at least one read request has been made */
        bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
                                  stream_state->rs.remaining_bytes);
        stream_state->pending_read_from_cronet = true;
        result = ACTION_TAKEN_WITH_CALLBACK;
      } else {
        result = NO_ACTION_POSSIBLE;
      }
    } else if (stream_state->rs.remaining_bytes == 0) {
      CRONET_LOG(GPR_DEBUG, "read operation complete");
      grpc_slice read_data_slice =
          GRPC_SLICE_MALLOC((uint32_t)stream_state->rs.length_field);
      uint8_t* dst_p = GRPC_SLICE_START_PTR(read_data_slice);
      memcpy(dst_p, stream_state->rs.read_buffer,
             static_cast<size_t>(stream_state->rs.length_field));
      null_and_maybe_free_read_buffer(s);
      /* Clean up read_slice_buffer in case there is unread data. */
      grpc_slice_buffer_destroy_internal(&stream_state->rs.read_slice_buffer);
      grpc_slice_buffer_init(&stream_state->rs.read_slice_buffer);
      grpc_slice_buffer_add(&stream_state->rs.read_slice_buffer,
                            read_data_slice);
      uint32_t flags = 0;
      if (stream_state->rs.compressed) {
        flags = GRPC_WRITE_INTERNAL_COMPRESS;
      }
      stream_state->rs.sbs.Init(&stream_state->rs.read_slice_buffer, flags);
      stream_op->payload->recv_message.recv_message->reset(
          stream_state->rs.sbs.get());
      grpc_core::ExecCtx::Run(
          DEBUG_LOCATION, stream_op->payload->recv_message.recv_message_ready,
          GRPC_ERROR_NONE);
      stream_state->state_op_done[OP_RECV_MESSAGE] = true;
      oas->state.state_op_done[OP_RECV_MESSAGE] = true;
      /* Do an extra read to trigger on_succeeded() callback in case connection
         is closed */
      stream_state->rs.read_buffer = stream_state->rs.grpc_header_bytes;
      stream_state->rs.compressed = false;
      stream_state->rs.received_bytes = 0;
      stream_state->rs.remaining_bytes = GRPC_HEADER_SIZE_IN_BYTES;
      stream_state->rs.length_field_received = false;
      CRONET_LOG(GPR_DEBUG, "bidirectional_stream_read(%p)", s->cbs);
      bidirectional_stream_read(s->cbs, stream_state->rs.read_buffer,
                                stream_state->rs.remaining_bytes);
      stream_state->pending_read_from_cronet = true;
      result = ACTION_TAKEN_NO_CALLBACK;
    }
  } else if (stream_op->recv_trailing_metadata &&
             op_can_be_run(stream_op, s, &oas->state,
                           OP_RECV_TRAILING_METADATA)) {
    CRONET_LOG(GPR_DEBUG, "running: %p  OP_RECV_TRAILING_METADATA", oas);
    grpc_error* error = GRPC_ERROR_NONE;
    if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
      error = GRPC_ERROR_REF(stream_state->cancel_error);
    } else if (stream_state->state_callback_received[OP_FAILED]) {
      error = make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable.");
    } else if (oas->s->state.rs.trailing_metadata_valid) {
      grpc_chttp2_incoming_metadata_buffer_publish(
          &oas->s->state.rs.trailing_metadata,
          stream_op->payload->recv_trailing_metadata.recv_trailing_metadata);
      stream_state->rs.trailing_metadata_valid = false;
    }
    grpc_core::ExecCtx::Run(
        DEBUG_LOCATION,
        stream_op->payload->recv_trailing_metadata.recv_trailing_metadata_ready,
        error);
    stream_state->state_op_done[OP_RECV_TRAILING_METADATA] = true;
    result = ACTION_TAKEN_NO_CALLBACK;
  } else if (stream_op->cancel_stream &&
             op_can_be_run(stream_op, s, &oas->state, OP_CANCEL_ERROR)) {
    CRONET_LOG(GPR_DEBUG, "running: %p  OP_CANCEL_ERROR", oas);
    if (s->cbs) {
      CRONET_LOG(GPR_DEBUG, "W: bidirectional_stream_cancel(%p)", s->cbs);
      bidirectional_stream_cancel(s->cbs);
      result = ACTION_TAKEN_WITH_CALLBACK;
    } else {
      result = ACTION_TAKEN_NO_CALLBACK;
    }
    stream_state->state_op_done[OP_CANCEL_ERROR] = true;
    if (!stream_state->cancel_error) {
      stream_state->cancel_error =
          GRPC_ERROR_REF(stream_op->payload->cancel_stream.cancel_error);
    }
  } else if (op_can_be_run(stream_op, s, &oas->state, OP_ON_COMPLETE)) {
    CRONET_LOG(GPR_DEBUG, "running: %p  OP_ON_COMPLETE", oas);
    if (stream_state->state_op_done[OP_CANCEL_ERROR]) {
      if (stream_op->on_complete) {
        grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
                                GRPC_ERROR_REF(stream_state->cancel_error));
      }
    } else if (stream_state->state_callback_received[OP_FAILED]) {
      if (stream_op->on_complete) {
        grpc_core::ExecCtx::Run(
            DEBUG_LOCATION, stream_op->on_complete,
            make_error_with_desc(GRPC_STATUS_UNAVAILABLE, "Unavailable."));
      }
    } else {
      /* All actions in this stream_op are complete. Call the on_complete
       * callback
       */
      if (stream_op->on_complete) {
        grpc_core::ExecCtx::Run(DEBUG_LOCATION, stream_op->on_complete,
                                GRPC_ERROR_NONE);
      }
    }
    oas->state.state_op_done[OP_ON_COMPLETE] = true;
    oas->done = true;
    /* reset any send message state, only if this ON_COMPLETE is about a send.
     */
    if (stream_op->send_message) {
      stream_state->state_callback_received[OP_SEND_MESSAGE] = false;
      stream_state->state_op_done[OP_SEND_MESSAGE] = false;
    }
    result = ACTION_TAKEN_NO_CALLBACK;
    /* If this is the on_complete callback being called for a received message -
      make a note */
    if (stream_op->recv_message)
      stream_state->state_op_done[OP_RECV_MESSAGE_AND_ON_COMPLETE] = true;
  } else {
    result = NO_ACTION_POSSIBLE;
  }
  return result;
}