cpp/source/stats/MetricBidiReactor.cpp (91 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MetricBidiReactor.h"
#include <chrono>
#include "rocketmq/Logger.h"
#include "spdlog/spdlog.h"
#include "OpencensusExporter.h"
#include "Signature.h"
ROCKETMQ_NAMESPACE_BEGIN
MetricBidiReactor::MetricBidiReactor(std::weak_ptr<Client> client, std::weak_ptr<OpencensusExporter> exporter)
: client_(client), exporter_(exporter) {
auto ptr = client_.lock();
Metadata metadata;
Signature::sign(ptr->config(), metadata);
for (const auto& entry : metadata) {
context_.AddMetadata(entry.first, entry.second);
}
context_.set_deadline(std::chrono::system_clock::now() + std::chrono::hours(24));
auto exporter_ptr = exporter_.lock();
if (!exporter_ptr) {
SPDLOG_WARN("Exporter has already been destructed");
return;
}
exporter_ptr->stub()->async()->Export(&context_, this);
StartCall();
}
void MetricBidiReactor::OnReadDone(bool ok) {
if (!ok) {
SPDLOG_WARN("Failed to read response");
return;
}
SPDLOG_DEBUG("OnReadDone OK");
StartRead(&response_);
}
void MetricBidiReactor::OnWriteDone(bool ok) {
if (!ok) {
SPDLOG_WARN("Failed to report metrics");
return;
}
SPDLOG_DEBUG("OnWriteDone OK");
fireRead();
bool expected = true;
if (inflight_.compare_exchange_strong(expected, false, std::memory_order_relaxed)) {
fireWrite();
}
}
void MetricBidiReactor::OnDone(const grpc::Status& s) {
auto client = client_.lock();
if (!client) {
return;
}
if (s.ok()) {
SPDLOG_DEBUG("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
} else {
SPDLOG_WARN("Bi-directional stream ended. status.code={}, status.message={}", s.error_code(), s.error_message());
auto exporter = exporter_.lock();
if (exporter) {
exporter->resetStream();
}
}
}
void MetricBidiReactor::write(ExportMetricsServiceRequest request) {
SPDLOG_DEBUG("Append ExportMetricsServiceRequest to buffer");
{
absl::MutexLock lk(&requests_mtx_);
requests_.emplace_back(std::move(request));
}
fireWrite();
}
void MetricBidiReactor::fireWrite() {
{
absl::MutexLock lk(&requests_mtx_);
if (requests_.empty()) {
SPDLOG_DEBUG("No more metric data to write");
return;
}
}
bool expected = false;
if (inflight_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
absl::MutexLock lk(&requests_mtx_);
request_ = std::move(*requests_.begin());
requests_.erase(requests_.begin());
SPDLOG_DEBUG("MetricBidiReactor#StartWrite");
StartWrite(&request_);
}
}
void MetricBidiReactor::fireRead() {
bool expected = false;
if (read_.compare_exchange_strong(expected, true, std::memory_order_relaxed)) {
StartRead(&response_);
}
}
ROCKETMQ_NAMESPACE_END