void TelemetryBidiReactor::OnReadDone()

in cpp/source/client/TelemetryBidiReactor.cpp [98:192]


void TelemetryBidiReactor::OnReadDone(bool ok) {
  SPDLOG_DEBUG("OnReadDone: ok={}", ok);
  if (!ok) {
    if (client_.lock()) {
      SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
    }

    {
      absl::MutexLock lk(&stream_state_mtx_);
      stream_state_ = StreamState::ReadDone;
    }
    fireClose();
    return;
  }

  SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString());
  auto ptr = client_.lock();
  if (!ptr) {
    SPDLOG_INFO("Client for {} has destructed", peer_address_);
    return;
  }

  switch (read_.command_case()) {
    case rmq::TelemetryCommand::kSettings: {
      auto settings = read_.settings();
      SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString());
      applySettings(settings);
      {
        absl::MutexLock lk(&server_setting_received_mtx_);
        if (!server_setting_received_) {
          server_setting_received_ = true;
          server_setting_received_cv_.SignalAll();
        }
      }
      break;
    }
    case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
      auto client = client_.lock();
      if (!client) {
        fireClose();
        return;
      }
      SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString());
      auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message());
      auto raw = const_cast<Message*>(message.get());
      raw->mutableExtension().target_endpoint = peer_address_;
      raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id();
      client->recoverOrphanedTransaction(message);

      break;
    }

    case rmq::TelemetryCommand::kPrintThreadStackTraceCommand: {
      TelemetryCommand response;
      response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce());
      response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported");
      {
        absl::MutexLock lk(&writes_mtx_);
        writes_.push_back(response);
      }
      fireWrite();
      break;
    }

    case rmq::TelemetryCommand::kVerifyMessageCommand: {
      auto client = client_.lock();
      if (!client) {
        fireClose();
        return;
      }

      std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this());
      auto cb = [ptr](TelemetryCommand command) {
        auto reactor = ptr.lock();
        if (!reactor) {
          return;
        }
        reactor->onVerifyMessageResult(std::move(command));
      };
      auto message = client->manager()->wrapMessage(read_.verify_message_command().message());
      auto raw = const_cast<Message*>(message.get());
      raw->mutableExtension().target_endpoint = peer_address_;
      raw->mutableExtension().nonce = read_.verify_message_command().nonce();
      client->verify(message, cb);
      break;
    }

    default: {
      SPDLOG_WARN("Unsupported command");
      break;
    }
  }

  fireRead();
}