cpp/source/client/TelemetryBidiReactor.cpp (294 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "TelemetryBidiReactor.h" #include <memory> #include <utility> #include "ClientManager.h" #include "MessageExt.h" #include "Metadata.h" #include "Signature.h" #include "google/protobuf/util/time_util.h" #include "rocketmq/Logger.h" #include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client, rmq::MessagingService::Stub* stub, std::string peer_address) : client_(client), peer_address_(std::move(peer_address)), state_(StreamState::Ready) { auto ptr = client_.lock(); auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1); context_.set_deadline(deadline); sync_settings_future_ = sync_settings_promise_.get_future(); Metadata metadata; Signature::sign(ptr->config(), metadata); for (const auto& entry : metadata) { context_.AddMetadata(entry.first, entry.second); } stub->async()->Telemetry(&context_, this); StartRead(&read_); // for read stream AddHold(); StartCall(); } TelemetryBidiReactor::~TelemetryBidiReactor() { SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_, static_cast<std::uint8_t>(state_)); } bool TelemetryBidiReactor::awaitApplyingSettings() { sync_settings_future_.get(); return true; } void TelemetryBidiReactor::OnWriteDone(bool ok) { SPDLOG_DEBUG("{}#OnWriteDone", peer_address_); // for write stream RemoveHold(); if (!ok) { SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().ShortDebugString(), peer_address_); signalClose(); return; } // Remove the command that has been written to server. { absl::MutexLock lk(&writes_mtx_); if (!writes_.empty()) { writes_.pop_front(); } } tryWriteNext(); } void TelemetryBidiReactor::OnReadDone(bool ok) { SPDLOG_DEBUG("{}#OnReadDone", peer_address_); if (!ok) { // for read stream RemoveHold(); // SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_); signalClose(); return; } { absl::MutexLock lk(&state_mtx_); if (StreamState::Ready != state_) { return; } } SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.ShortDebugString()); auto client = client_.lock(); if (!client) { SPDLOG_INFO("Client for {} has destructed", peer_address_); signalClose(); return; } switch (read_.command_case()) { case rmq::TelemetryCommand::kSettings: { auto settings = read_.settings(); SPDLOG_INFO("Receive settings from {}: {}", peer_address_, settings.ShortDebugString()); applySettings(settings); sync_settings_promise_.set_value(true); break; } case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: { SPDLOG_INFO("Receive orphan transaction command: {}", read_.ShortDebugString()); auto message = client->manager()->wrapMessage( read_.recover_orphaned_transaction_command().message()); auto raw = const_cast<Message*>(message.get()); raw->mutableExtension().target_endpoint = peer_address_; raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id(); client->recoverOrphanedTransaction(message); break; } case rmq::TelemetryCommand::kPrintThreadStackTraceCommand: { TelemetryCommand response; response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce()); response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported"); write(std::move(response)); break; } case rmq::TelemetryCommand::kVerifyMessageCommand: { std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this()); auto cb = [ptr](TelemetryCommand command) { auto reactor = ptr.lock(); if (!reactor) { return; } reactor->write(std::move(command)); }; auto message = client->manager()->wrapMessage(read_.verify_message_command().message()); auto raw = const_cast<Message*>(message.get()); raw->mutableExtension().target_endpoint = peer_address_; raw->mutableExtension().nonce = read_.verify_message_command().nonce(); client->verify(message, cb); break; } default: { SPDLOG_WARN("Telemetry command receive unsupported command"); break; } } { absl::MutexLock lk(&state_mtx_); if (StreamState::Ready == state_) { SPDLOG_DEBUG("Spawn new read op, state={}", static_cast<std::uint8_t>(state_)); StartRead(&read_); } } } void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) { auto ptr = client_.lock(); if (!ptr) { SPDLOG_INFO("Client for {} has destructed", peer_address_); return; } applyBackoffPolicy(settings, ptr); // Sync metrics collector configuration if (settings.has_metric()) { const auto& metric = settings.metric(); ptr->config().metric.on = metric.on(); ptr->config().metric.endpoints.set_scheme(metric.endpoints().scheme()); ptr->config().metric.endpoints.mutable_addresses()->CopyFrom(metric.endpoints().addresses()); } switch (settings.pub_sub_case()) { case rmq::Settings::PubSubCase::kPublishing: { applyPublishingConfig(settings, ptr); break; } case rmq::Settings::PubSubCase::kSubscription: { applySubscriptionConfig(settings, ptr); break; } default: { break; } } } void TelemetryBidiReactor::applyBackoffPolicy(const rmq::Settings& settings, std::shared_ptr<Client>& ptr) { // Apply backoff policy on throttling if (settings.has_backoff_policy()) { const auto& backoff_policy = settings.backoff_policy(); ptr->config().backoff_policy.max_attempt = backoff_policy.max_attempts(); if (backoff_policy.has_customized_backoff()) { for (const auto& item : backoff_policy.customized_backoff().next()) { auto backoff = std::chrono::seconds(item.seconds()) + std::chrono::nanoseconds(item.nanos()); ptr->config().backoff_policy.next.push_back(absl::FromChrono(backoff)); } } if (backoff_policy.has_exponential_backoff()) { const auto& exp_backoff = backoff_policy.exponential_backoff(); ptr->config().backoff_policy.initial = absl::FromChrono(std::chrono::seconds(exp_backoff.initial().seconds()) + std::chrono::nanoseconds(exp_backoff.initial().nanos())); ptr->config().backoff_policy.multiplier = exp_backoff.multiplier(); ptr->config().backoff_policy.max = absl::FromChrono(std::chrono::seconds(exp_backoff.max().seconds()) + std::chrono::nanoseconds(exp_backoff.max().nanos())); } } } void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { // The server may have implicitly assumed a namespace for the client. if (!settings.publishing().topics().empty()) { for (const auto& topic : settings.publishing().topics()) { if (topic.resource_namespace() != client->config().resource_namespace) { SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, topic.resource_namespace()); client->config().resource_namespace = topic.resource_namespace(); break; } } } client->config().publisher.max_body_size = settings.publishing().max_body_size(); } void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) { // The server may have implicitly assumed a namespace for the client. if (!settings.subscription().subscriptions().empty()) { for (const auto& subscription : settings.subscription().subscriptions()) { if (subscription.topic().resource_namespace() != client->config().resource_namespace) { SPDLOG_INFO("Client namespace is changed from [{}] to [{}]", client->config().resource_namespace, subscription.topic().resource_namespace()); client->config().resource_namespace = subscription.topic().resource_namespace(); break; } } } client->config().subscriber.fifo = settings.subscription().fifo(); auto polling_timeout = google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout()); client->config().subscriber.polling_timeout = absl::Milliseconds(polling_timeout); client->config().subscriber.receive_batch_size = settings.subscription().receive_batch_size(); } void TelemetryBidiReactor::write(TelemetryCommand command) { SPDLOG_DEBUG("{}#write", peer_address_); { absl::MutexLock lk(&state_mtx_); // Reject incoming write commands if the stream state is closing or has witnessed some error. if (StreamState::Ready != state_) { return; } } { absl::MutexLock lk(&writes_mtx_); writes_.push_back(command); } tryWriteNext(); } void TelemetryBidiReactor::tryWriteNext() { SPDLOG_DEBUG("{}#tryWriteNext", peer_address_); absl::MutexLock lk(&writes_mtx_); if (StreamState::Ready != state_) { SPDLOG_WARN("Further write to {} is not allowed due to stream-state={}", peer_address_, static_cast<std::uint8_t>(state_)); return; } if (writes_.empty()) { SPDLOG_DEBUG("No TelemetryCommand to write. Peer={}", peer_address_); return; } if (!writes_.empty()) { SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().ShortDebugString()); AddHold(); StartWrite(&(writes_.front())); } } void TelemetryBidiReactor::signalClose() { absl::MutexLock lk(&state_mtx_); state_ = StreamState::Closing; } void TelemetryBidiReactor::close() { SPDLOG_DEBUG("{}#fireClose", peer_address_); { absl::MutexLock lk(&state_mtx_); if (state_ == StreamState::Ready) { state_ = StreamState::Closing; } } { absl::MutexLock lk(&writes_mtx_); writes_.clear(); } context_.TryCancel(); // Acquire state lock while (StreamState::Closed != state_) { absl::MutexLock lk(&state_mtx_); if (state_cv_.WaitWithTimeout(&state_mtx_, absl::Seconds(1))) { SPDLOG_WARN("StreamState CondVar timed out before getting signalled: state={}", static_cast<uint8_t>(state_)); } } } /// Notifies the application that all operations associated with this RPC /// have completed and all Holds have been removed. OnDone provides the RPC /// status outcome for both successful and failed RPCs and will be called in /// all cases. If it is not called, it indicates an application-level problem /// (like failure to remove a hold). /// /// \param[in] status The status outcome of this RPC void TelemetryBidiReactor::OnDone(const grpc::Status& status) { SPDLOG_DEBUG("{}#OnDone, status.ok={}", peer_address_, status.ok()); if (!status.ok()) { SPDLOG_DEBUG("{}#OnDone, status.error_code={}, status.error_message={}, status.error_details={}", peer_address_, status.error_code(), status.error_message(), status.error_details()); } { absl::MutexLock lk(&state_mtx_); state_ = StreamState::Closed; state_cv_.SignalAll(); } auto client = client_.lock(); if (!client) { return; } if (client->active()) { client->createSession(peer_address_, true); } } void TelemetryBidiReactor::OnReadInitialMetadataDone(bool ok) { SPDLOG_DEBUG("{}#OnReadInitialMetadataDone", peer_address_); if (!ok) { // for read stream // Remove the hold corresponding to AddHold in TelemetryBidiReactor::TelemetryBidiReactor. RemoveHold(); SPDLOG_DEBUG("Change state {} --> {}", static_cast<std::uint8_t>(state_), static_cast<std::uint8_t>(StreamState::Closing)); SPDLOG_WARN("Read of initial-metadata failed from {}", peer_address_); signalClose(); return; } SPDLOG_DEBUG("Received initial metadata from {}", peer_address_); } ROCKETMQ_NAMESPACE_END