void TelemetryBidiReactor::OnReadDone()

in cpp/source/client/TelemetryBidiReactor.cpp [86:169]


void TelemetryBidiReactor::OnReadDone(bool ok) {
  SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
  if (!ok) {
    // for read stream
    RemoveHold();
    // SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
    signalClose();
    return;
  }

  {
    absl::MutexLock lk(&state_mtx_);
    if (StreamState::Ready != state_) {
      return;
    }
  }

  SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.ShortDebugString());
  auto client = client_.lock();
  if (!client) {
    SPDLOG_INFO("Client for {} has destructed", peer_address_);
    signalClose();
    return;
  }

  switch (read_.command_case()) {
    case rmq::TelemetryCommand::kSettings: {
      auto settings = read_.settings();
      SPDLOG_INFO("Receive settings from {}: {}", peer_address_, settings.ShortDebugString());
      applySettings(settings);
      sync_settings_promise_.set_value(true);
      break;
    }

    case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
      SPDLOG_INFO("Receive orphan transaction command: {}", read_.ShortDebugString());
      auto message = client->manager()->wrapMessage(
          read_.recover_orphaned_transaction_command().message());
      auto raw = const_cast<Message*>(message.get());
      raw->mutableExtension().target_endpoint = peer_address_;
      raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id();
      client->recoverOrphanedTransaction(message);
      break;
    }

    case rmq::TelemetryCommand::kPrintThreadStackTraceCommand: {
      TelemetryCommand response;
      response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce());
      response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported");
      write(std::move(response));
      break;
    }

    case rmq::TelemetryCommand::kVerifyMessageCommand: {
      std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this());
      auto cb = [ptr](TelemetryCommand command) {
        auto reactor = ptr.lock();
        if (!reactor) {
          return;
        }
        reactor->write(std::move(command));
      };
      auto message = client->manager()->wrapMessage(read_.verify_message_command().message());
      auto raw = const_cast<Message*>(message.get());
      raw->mutableExtension().target_endpoint = peer_address_;
      raw->mutableExtension().nonce = read_.verify_message_command().nonce();
      client->verify(message, cb);
      break;
    }

    default: {
      SPDLOG_WARN("Telemetry command receive unsupported command");
      break;
    }
  }

  {
    absl::MutexLock lk(&state_mtx_);
    if (StreamState::Ready == state_) {
      SPDLOG_DEBUG("Spawn new read op, state={}", static_cast<std::uint8_t>(state_));
      StartRead(&read_);
    }
  }
}