in cpp/source/client/TelemetryBidiReactor.cpp [86:169]
void TelemetryBidiReactor::OnReadDone(bool ok) {
SPDLOG_DEBUG("{}#OnReadDone", peer_address_);
if (!ok) {
// for read stream
RemoveHold();
// SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
signalClose();
return;
}
{
absl::MutexLock lk(&state_mtx_);
if (StreamState::Ready != state_) {
return;
}
}
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.ShortDebugString());
auto client = client_.lock();
if (!client) {
SPDLOG_INFO("Client for {} has destructed", peer_address_);
signalClose();
return;
}
switch (read_.command_case()) {
case rmq::TelemetryCommand::kSettings: {
auto settings = read_.settings();
SPDLOG_INFO("Receive settings from {}: {}", peer_address_, settings.ShortDebugString());
applySettings(settings);
sync_settings_promise_.set_value(true);
break;
}
case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
SPDLOG_INFO("Receive orphan transaction command: {}", read_.ShortDebugString());
auto message = client->manager()->wrapMessage(
read_.recover_orphaned_transaction_command().message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id();
client->recoverOrphanedTransaction(message);
break;
}
case rmq::TelemetryCommand::kPrintThreadStackTraceCommand: {
TelemetryCommand response;
response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce());
response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported");
write(std::move(response));
break;
}
case rmq::TelemetryCommand::kVerifyMessageCommand: {
std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this());
auto cb = [ptr](TelemetryCommand command) {
auto reactor = ptr.lock();
if (!reactor) {
return;
}
reactor->write(std::move(command));
};
auto message = client->manager()->wrapMessage(read_.verify_message_command().message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().nonce = read_.verify_message_command().nonce();
client->verify(message, cb);
break;
}
default: {
SPDLOG_WARN("Telemetry command receive unsupported command");
break;
}
}
{
absl::MutexLock lk(&state_mtx_);
if (StreamState::Ready == state_) {
SPDLOG_DEBUG("Spawn new read op, state={}", static_cast<std::uint8_t>(state_));
StartRead(&read_);
}
}
}