cpp/source/client/TelemetryBidiReactor.cpp (284 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "TelemetryBidiReactor.h" #include <atomic> #include <cstdint> #include <memory> #include <utility> #include "ClientManager.h" #include "rocketmq/Logger.h" #include "spdlog/spdlog.h" #include "MessageExt.h" #include "Metadata.h" #include "RpcClient.h" #include "Signature.h" #include "google/protobuf/util/time_util.h" ROCKETMQ_NAMESPACE_BEGIN TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, rmq::MessagingService::Stub* stub, std::string peer_address) : client_(client), peer_address_(std::move(peer_address)), stream_state_(StreamState::Created) { auto ptr = client.lock(); auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1); context_.set_deadline(deadline); Metadata metadata; Signature::sign(ptr->config(), metadata); for (const auto& entry : metadata) { context_.AddMetadata(entry.first, entry.second); } stub->async()->Telemetry(&context_, this); StartCall(); } TelemetryBidiReactor::~TelemetryBidiReactor() { SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_, static_cast<std::uint8_t>(stream_state_)); } bool TelemetryBidiReactor::await() { absl::MutexLock lk(&server_setting_received_mtx_); if (server_setting_received_) { return true; } server_setting_received_cv_.Wait(&server_setting_received_mtx_); return server_setting_received_; } void TelemetryBidiReactor::OnWriteDone(bool ok) { SPDLOG_DEBUG("OnWriteDone: {}", ok); { bool expect = true; if (!command_inflight_.compare_exchange_strong(expect, false, std::memory_order_relaxed)) { SPDLOG_WARN("Illegal command-inflight state"); } } if (!ok) { SPDLOG_WARN("Failed to write telemetry command {} to {}", write_.DebugString(), peer_address_); { absl::MutexLock lk(&stream_state_mtx_); stream_state_ = StreamState::WriteDone; } fireClose(); return; } { absl::MutexLock lk(&stream_state_mtx_); if (StreamState::Created == stream_state_) { stream_state_ = StreamState::Active; fireRead(); } } fireWrite(); } void TelemetryBidiReactor::OnReadDone(bool ok) { SPDLOG_DEBUG("OnReadDone: ok={}", ok); if (!ok) { if (client_.lock()) { SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_); } { absl::MutexLock lk(&stream_state_mtx_); stream_state_ = StreamState::ReadDone; } fireClose(); return; } SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString()); auto ptr = client_.lock(); if (!ptr) { SPDLOG_INFO("Client for {} has destructed", peer_address_); return; } switch (read_.command_case()) { case rmq::TelemetryCommand::kSettings: { auto settings = read_.settings(); SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString()); applySettings(settings); { absl::MutexLock lk(&server_setting_received_mtx_); if (!server_setting_received_) { server_setting_received_ = true; server_setting_received_cv_.SignalAll(); } } break; } case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: { auto client = client_.lock(); if (!client) { fireClose(); return; } SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString()); auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message()); auto raw = const_cast<Message*>(message.get()); raw->mutableExtension().target_endpoint = peer_address_; raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id(); client->recoverOrphanedTransaction(message); break; } case rmq::TelemetryCommand::kPrintThreadStackTraceCommand: { TelemetryCommand response; response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce()); response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported"); { absl::MutexLock lk(&writes_mtx_); writes_.push_back(response); } fireWrite(); break; } case rmq::TelemetryCommand::kVerifyMessageCommand: { auto client = client_.lock(); if (!client) { fireClose(); return; } std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this()); auto cb = [ptr](TelemetryCommand command) { auto reactor = ptr.lock(); if (!reactor) { return; } reactor->onVerifyMessageResult(std::move(command)); }; auto message = client->manager()->wrapMessage(read_.verify_message_command().message()); auto raw = const_cast<Message*>(message.get()); raw->mutableExtension().target_endpoint = peer_address_; raw->mutableExtension().nonce = read_.verify_message_command().nonce(); client->verify(message, cb); break; } default: { SPDLOG_WARN("Unsupported command"); break; } } fireRead(); } void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) { auto ptr = client_.lock(); if (!ptr) { SPDLOG_INFO("Client for {} has destructed", peer_address_); return; } applyBackoffPolicy(settings, ptr); // Sync metrics collector configuration if (settings.has_metric()) { const auto& metric = settings.metric(); ptr->config().metric.on = metric.on(); ptr->config().metric.endpoints.set_scheme(metric.endpoints().scheme()); ptr->config().metric.endpoints.mutable_addresses()->CopyFrom(metric.endpoints().addresses()); } switch (settings.pub_sub_case()) { case rmq::Settings::PubSubCase::kPublishing: { applyPublishingConfig(settings, ptr); break; } case rmq::Settings::PubSubCase::kSubscription: { applySubscriptionConfig(settings, ptr); break; } default: { break; } } } void TelemetryBidiReactor::applyBackoffPolicy(const rmq::Settings& settings, std::shared_ptr<Client>& ptr) { // Apply backoff policy on throttling if (settings.has_backoff_policy()) { const auto& backoff_policy = settings.backoff_policy(); ptr->config().backoff_policy.max_attempt = backoff_policy.max_attempts(); if (backoff_policy.has_customized_backoff()) { for (const auto& item : backoff_policy.customized_backoff().next()) { auto backoff = std::chrono::seconds(item.seconds()) + std::chrono::nanoseconds(item.nanos()); ptr->config().backoff_policy.next.push_back(absl::FromChrono(backoff)); } } if (backoff_policy.has_exponential_backoff()) { const auto& exp_backoff = backoff_policy.exponential_backoff(); ptr->config().backoff_policy.initial = absl::FromChrono(std::chrono::seconds(exp_backoff.initial().seconds()) + std::chrono::nanoseconds(exp_backoff.initial().nanos())); ptr->config().backoff_policy.multiplier = exp_backoff.multiplier(); ptr->config().backoff_policy.max = absl::FromChrono(std::chrono::seconds(exp_backoff.max().seconds()) + std::chrono::nanoseconds(exp_backoff.max().nanos())); } } } void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { client->config().publisher.max_body_size = settings.publishing().max_body_size(); } void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { client->config().subscriber.fifo = settings.subscription().fifo(); auto polling_timeout = google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout()); client->config().subscriber.polling_timeout = absl::Milliseconds(polling_timeout); client->config().subscriber.receive_batch_size = settings.subscription().receive_batch_size(); } void TelemetryBidiReactor::fireRead() { SPDLOG_DEBUG("{}#fireRead", peer_address_); StartRead(&read_); } void TelemetryBidiReactor::write(TelemetryCommand command) { { absl::MutexLock lk(&writes_mtx_); writes_.push_back(command); } fireWrite(); } void TelemetryBidiReactor::fireWrite() { SPDLOG_DEBUG("{}#fireWrite", peer_address_); { absl::MutexLock lk(&writes_mtx_); if (writes_.empty()) { SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_); return; } bool expect = false; if (command_inflight_.compare_exchange_strong(expect, true, std::memory_order_relaxed)) { write_ = std::move(*writes_.begin()); writes_.erase(writes_.begin()); } else { SPDLOG_DEBUG("Another command is already on the wire. Peer={}", peer_address_); return; } } SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, write_.DebugString()); StartWrite(&write_); } void TelemetryBidiReactor::fireClose() { SPDLOG_INFO("{}#fireClose", peer_address_); if (StreamState::Active == stream_state_) { StartWritesDone(); { absl::MutexLock lk(&stream_state_mtx_); if (StreamState::Active == stream_state_) { stream_state_cv_.Wait(&stream_state_mtx_); } } } } void TelemetryBidiReactor::OnWritesDoneDone(bool ok) { SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_); } void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) { { absl::MutexLock lk(&writes_mtx_); writes_.emplace_back(command); } fireWrite(); } /// Notifies the application that all operations associated with this RPC /// have completed and all Holds have been removed. OnDone provides the RPC /// status outcome for both successful and failed RPCs and will be called in /// all cases. If it is not called, it indicates an application-level problem /// (like failure to remove a hold). /// /// \param[in] status The status outcome of this RPC void TelemetryBidiReactor::OnDone(const grpc::Status& status) { SPDLOG_DEBUG("{}#OnDone, status.ok={}", peer_address_, status.ok()); if (!status.ok()) { SPDLOG_WARN("{}#OnDone, status.error_code={}, status.error_message={}, status.error_details={}", peer_address_, status.error_code(), status.error_message(), status.error_details()); } { SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_); absl::MutexLock lk(&stream_state_mtx_); stream_state_ = StreamState::Closed; stream_state_cv_.SignalAll(); } auto client = client_.lock(); if (!client) { return; } if (client->active()) { client->createSession(peer_address_, true); } } ROCKETMQ_NAMESPACE_END