in cpp/source/client/TelemetryBidiReactor.cpp [98:192]
void TelemetryBidiReactor::OnReadDone(bool ok) {
SPDLOG_DEBUG("OnReadDone: ok={}", ok);
if (!ok) {
if (client_.lock()) {
SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
}
{
absl::MutexLock lk(&stream_state_mtx_);
stream_state_ = StreamState::ReadDone;
}
fireClose();
return;
}
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString());
auto ptr = client_.lock();
if (!ptr) {
SPDLOG_INFO("Client for {} has destructed", peer_address_);
return;
}
switch (read_.command_case()) {
case rmq::TelemetryCommand::kSettings: {
auto settings = read_.settings();
SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString());
applySettings(settings);
{
absl::MutexLock lk(&server_setting_received_mtx_);
if (!server_setting_received_) {
server_setting_received_ = true;
server_setting_received_cv_.SignalAll();
}
}
break;
}
case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
auto client = client_.lock();
if (!client) {
fireClose();
return;
}
SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString());
auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id();
client->recoverOrphanedTransaction(message);
break;
}
case rmq::TelemetryCommand::kPrintThreadStackTraceCommand: {
TelemetryCommand response;
response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce());
response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported");
{
absl::MutexLock lk(&writes_mtx_);
writes_.push_back(response);
}
fireWrite();
break;
}
case rmq::TelemetryCommand::kVerifyMessageCommand: {
auto client = client_.lock();
if (!client) {
fireClose();
return;
}
std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this());
auto cb = [ptr](TelemetryCommand command) {
auto reactor = ptr.lock();
if (!reactor) {
return;
}
reactor->onVerifyMessageResult(std::move(command));
};
auto message = client->manager()->wrapMessage(read_.verify_message_command().message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().nonce = read_.verify_message_command().nonce();
client->verify(message, cb);
break;
}
default: {
SPDLOG_WARN("Unsupported command");
break;
}
}
fireRead();
}