cpp/source/client/TelemetryBidiReactor.cpp (284 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TelemetryBidiReactor.h"
#include <atomic>
#include <cstdint>
#include <memory>
#include <utility>
#include "ClientManager.h"
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
#include "MessageExt.h"
#include "Metadata.h"
#include "RpcClient.h"
#include "Signature.h"
#include "google/protobuf/util/time_util.h"
ROCKETMQ_NAMESPACE_BEGIN
TelemetryBidiReactor::TelemetryBidiReactor(std::weak_ptr<Client> client,
rmq::MessagingService::Stub* stub,
std::string peer_address)
: client_(client), peer_address_(std::move(peer_address)), stream_state_(StreamState::Created) {
auto ptr = client.lock();
auto deadline = std::chrono::system_clock::now() + std::chrono::hours(1);
context_.set_deadline(deadline);
Metadata metadata;
Signature::sign(ptr->config(), metadata);
for (const auto& entry : metadata) {
context_.AddMetadata(entry.first, entry.second);
}
stub->async()->Telemetry(&context_, this);
StartCall();
}
TelemetryBidiReactor::~TelemetryBidiReactor() {
SPDLOG_INFO("Telemetry stream for {} destructed. StreamState={}", peer_address_,
static_cast<std::uint8_t>(stream_state_));
}
bool TelemetryBidiReactor::await() {
absl::MutexLock lk(&server_setting_received_mtx_);
if (server_setting_received_) {
return true;
}
server_setting_received_cv_.Wait(&server_setting_received_mtx_);
return server_setting_received_;
}
void TelemetryBidiReactor::OnWriteDone(bool ok) {
SPDLOG_DEBUG("OnWriteDone: {}", ok);
{
bool expect = true;
if (!command_inflight_.compare_exchange_strong(expect, false, std::memory_order_relaxed)) {
SPDLOG_WARN("Illegal command-inflight state");
}
}
if (!ok) {
SPDLOG_WARN("Failed to write telemetry command {} to {}", write_.DebugString(), peer_address_);
{
absl::MutexLock lk(&stream_state_mtx_);
stream_state_ = StreamState::WriteDone;
}
fireClose();
return;
}
{
absl::MutexLock lk(&stream_state_mtx_);
if (StreamState::Created == stream_state_) {
stream_state_ = StreamState::Active;
fireRead();
}
}
fireWrite();
}
void TelemetryBidiReactor::OnReadDone(bool ok) {
SPDLOG_DEBUG("OnReadDone: ok={}", ok);
if (!ok) {
if (client_.lock()) {
SPDLOG_WARN("Failed to read telemetry command from {}", peer_address_);
}
{
absl::MutexLock lk(&stream_state_mtx_);
stream_state_ = StreamState::ReadDone;
}
fireClose();
return;
}
SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString());
auto ptr = client_.lock();
if (!ptr) {
SPDLOG_INFO("Client for {} has destructed", peer_address_);
return;
}
switch (read_.command_case()) {
case rmq::TelemetryCommand::kSettings: {
auto settings = read_.settings();
SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString());
applySettings(settings);
{
absl::MutexLock lk(&server_setting_received_mtx_);
if (!server_setting_received_) {
server_setting_received_ = true;
server_setting_received_cv_.SignalAll();
}
}
break;
}
case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
auto client = client_.lock();
if (!client) {
fireClose();
return;
}
SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString());
auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id();
client->recoverOrphanedTransaction(message);
break;
}
case rmq::TelemetryCommand::kPrintThreadStackTraceCommand: {
TelemetryCommand response;
response.mutable_thread_stack_trace()->set_nonce(read_.print_thread_stack_trace_command().nonce());
response.mutable_thread_stack_trace()->set_thread_stack_trace("PrintStackTrace is not supported");
{
absl::MutexLock lk(&writes_mtx_);
writes_.push_back(response);
}
fireWrite();
break;
}
case rmq::TelemetryCommand::kVerifyMessageCommand: {
auto client = client_.lock();
if (!client) {
fireClose();
return;
}
std::weak_ptr<TelemetryBidiReactor> ptr(shared_from_this());
auto cb = [ptr](TelemetryCommand command) {
auto reactor = ptr.lock();
if (!reactor) {
return;
}
reactor->onVerifyMessageResult(std::move(command));
};
auto message = client->manager()->wrapMessage(read_.verify_message_command().message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().nonce = read_.verify_message_command().nonce();
client->verify(message, cb);
break;
}
default: {
SPDLOG_WARN("Unsupported command");
break;
}
}
fireRead();
}
void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
auto ptr = client_.lock();
if (!ptr) {
SPDLOG_INFO("Client for {} has destructed", peer_address_);
return;
}
applyBackoffPolicy(settings, ptr);
// Sync metrics collector configuration
if (settings.has_metric()) {
const auto& metric = settings.metric();
ptr->config().metric.on = metric.on();
ptr->config().metric.endpoints.set_scheme(metric.endpoints().scheme());
ptr->config().metric.endpoints.mutable_addresses()->CopyFrom(metric.endpoints().addresses());
}
switch (settings.pub_sub_case()) {
case rmq::Settings::PubSubCase::kPublishing: {
applyPublishingConfig(settings, ptr);
break;
}
case rmq::Settings::PubSubCase::kSubscription: {
applySubscriptionConfig(settings, ptr);
break;
}
default: {
break;
}
}
}
void TelemetryBidiReactor::applyBackoffPolicy(const rmq::Settings& settings, std::shared_ptr<Client>& ptr) {
// Apply backoff policy on throttling
if (settings.has_backoff_policy()) {
const auto& backoff_policy = settings.backoff_policy();
ptr->config().backoff_policy.max_attempt = backoff_policy.max_attempts();
if (backoff_policy.has_customized_backoff()) {
for (const auto& item : backoff_policy.customized_backoff().next()) {
auto backoff = std::chrono::seconds(item.seconds()) + std::chrono::nanoseconds(item.nanos());
ptr->config().backoff_policy.next.push_back(absl::FromChrono(backoff));
}
}
if (backoff_policy.has_exponential_backoff()) {
const auto& exp_backoff = backoff_policy.exponential_backoff();
ptr->config().backoff_policy.initial = absl::FromChrono(std::chrono::seconds(exp_backoff.initial().seconds()) +
std::chrono::nanoseconds(exp_backoff.initial().nanos()));
ptr->config().backoff_policy.multiplier = exp_backoff.multiplier();
ptr->config().backoff_policy.max = absl::FromChrono(std::chrono::seconds(exp_backoff.max().seconds()) +
std::chrono::nanoseconds(exp_backoff.max().nanos()));
}
}
}
void TelemetryBidiReactor::applyPublishingConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) {
client->config().publisher.max_body_size = settings.publishing().max_body_size();
}
void TelemetryBidiReactor::applySubscriptionConfig(const rmq::Settings& settings, std::shared_ptr<Client> client) {
client->config().subscriber.fifo = settings.subscription().fifo();
auto polling_timeout =
google::protobuf::util::TimeUtil::DurationToMilliseconds(settings.subscription().long_polling_timeout());
client->config().subscriber.polling_timeout = absl::Milliseconds(polling_timeout);
client->config().subscriber.receive_batch_size = settings.subscription().receive_batch_size();
}
void TelemetryBidiReactor::fireRead() {
SPDLOG_DEBUG("{}#fireRead", peer_address_);
StartRead(&read_);
}
void TelemetryBidiReactor::write(TelemetryCommand command) {
{
absl::MutexLock lk(&writes_mtx_);
writes_.push_back(command);
}
fireWrite();
}
void TelemetryBidiReactor::fireWrite() {
SPDLOG_DEBUG("{}#fireWrite", peer_address_);
{
absl::MutexLock lk(&writes_mtx_);
if (writes_.empty()) {
SPDLOG_DEBUG("No TelemtryCommand to write. Peer={}", peer_address_);
return;
}
bool expect = false;
if (command_inflight_.compare_exchange_strong(expect, true, std::memory_order_relaxed)) {
write_ = std::move(*writes_.begin());
writes_.erase(writes_.begin());
} else {
SPDLOG_DEBUG("Another command is already on the wire. Peer={}", peer_address_);
return;
}
}
SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, write_.DebugString());
StartWrite(&write_);
}
void TelemetryBidiReactor::fireClose() {
SPDLOG_INFO("{}#fireClose", peer_address_);
if (StreamState::Active == stream_state_) {
StartWritesDone();
{
absl::MutexLock lk(&stream_state_mtx_);
if (StreamState::Active == stream_state_) {
stream_state_cv_.Wait(&stream_state_mtx_);
}
}
}
}
void TelemetryBidiReactor::OnWritesDoneDone(bool ok) {
SPDLOG_DEBUG("{}#OnWritesDoneDone", peer_address_);
}
void TelemetryBidiReactor::onVerifyMessageResult(TelemetryCommand command) {
{
absl::MutexLock lk(&writes_mtx_);
writes_.emplace_back(command);
}
fireWrite();
}
/// Notifies the application that all operations associated with this RPC
/// have completed and all Holds have been removed. OnDone provides the RPC
/// status outcome for both successful and failed RPCs and will be called in
/// all cases. If it is not called, it indicates an application-level problem
/// (like failure to remove a hold).
///
/// \param[in] status The status outcome of this RPC
void TelemetryBidiReactor::OnDone(const grpc::Status& status) {
SPDLOG_DEBUG("{}#OnDone, status.ok={}", peer_address_, status.ok());
if (!status.ok()) {
SPDLOG_WARN("{}#OnDone, status.error_code={}, status.error_message={}, status.error_details={}", peer_address_,
status.error_code(), status.error_message(), status.error_details());
}
{
SPDLOG_DEBUG("{} notifies awaiting close call", peer_address_);
absl::MutexLock lk(&stream_state_mtx_);
stream_state_ = StreamState::Closed;
stream_state_cv_.SignalAll();
}
auto client = client_.lock();
if (!client) {
return;
}
if (client->active()) {
client->createSession(peer_address_, true);
}
}
ROCKETMQ_NAMESPACE_END