be/src/statestore/statestore.cc (1,899 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "statestore/statestore.h"
#include <algorithm>
#include <tuple>
#include <utility>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/lexical_cast.hpp>
#include <thrift/Thrift.h>
#include <thrift/protocol/TProtocolException.h>
#include <gutil/strings/substitute.h>
#include <gutil/strings/util.h>
#include "common/status.h"
#include "gen-cpp/StatestoreService_types.h"
#include "rpc/rpc-trace.h"
#include "rpc/thrift-util.h"
#include "statestore/failure-detector.h"
#include "statestore/statestore-subscriber-client-wrapper.h"
#include "util/collection-metrics.h"
#include "util/container-util.h"
#include "util/debug-util.h"
#include "util/logging-support.h"
#include "util/metrics.h"
#include "util/openssl-util.h"
#include "util/pretty-printer.h"
#include "util/test-info.h"
#include "util/time.h"
#include "util/uid-util.h"
#include "util/webserver.h"
#include "common/names.h"
using boost::algorithm::ends_with;
using boost::posix_time::seconds;
using boost::shared_lock;
using boost::shared_mutex;
using boost::upgrade_lock;
using boost::upgrade_to_unique_lock;
using std::forward_as_tuple;
using std::piecewise_construct;
using namespace apache::thrift;
using namespace impala;
using namespace rapidjson;
using namespace strings;
DEFINE_int32(statestore_max_missed_heartbeats, 10, "Maximum number of consecutive "
"heartbeat messages an impalad can miss before being declared failed by the "
"statestore.");
DEFINE_int32(statestore_num_update_threads, 10, "(Advanced) Number of threads used to "
" send topic updates in parallel to all registered subscribers.");
DEFINE_int32(statestore_update_frequency_ms, 2000, "(Advanced) Frequency (in ms) with"
" which the statestore sends topic updates to subscribers.");
// Priority updates are sent out much more frequently. They are assumed to be small
// amounts of data that take a small amount of time to process. Assuming each update
// takes < 1ms to process, sending out an update every 100ms will consume less than
// 1% of a CPU on each subscriber.
DEFINE_int32(statestore_num_priority_update_threads, 10, "(Advanced) Number of threads "
"used to send prioritized topic updates in parallel to all registered subscribers.");
DEFINE_int32(statestore_priority_update_frequency_ms, 100, "(Advanced) Frequency (in ms) "
"with which the statestore sends prioritized topic updates to subscribers.");
DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads used to "
" send heartbeats in parallel to all registered subscribers.");
DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with"
" which the statestore sends heartbeat heartbeats to subscribers.");
DEFINE_double_hidden(heartbeat_monitoring_frequency_ms, 60000, "(Advanced) Frequency (in "
"ms) with which the statestore monitors heartbeats from a subscriber.");
DEFINE_int32(statestore_heartbeat_tcp_timeout_seconds, 3, "(Advanced) The time after "
"which a heartbeat RPC to a subscriber will timeout. This setting protects against "
"badly hung machines that are not able to respond to the heartbeat RPC in short "
"order");
DEFINE_int32(statestore_max_subscribers, 10000, "Used to control the maximum size "
"of the pending topic-update queue. There is at most one entry per subscriber.");
// If this value is set too low, it's possible that UpdateState() might timeout during a
// working invocation, and only a restart of the statestore with a change in value would
// allow progress to be made. If set too high, a hung subscriber will waste an update
// thread for much longer than it needs to. We choose 5 minutes as a safe default because
// large catalogs can take a very long time to process, but rarely more than a minute. The
// loss of a single thread for five minutes should usually be invisible to the user; if
// there is a correlated set of machine hangs that exhausts most threads the cluster can
// already be said to be in a bad state. Note that the heartbeat mechanism will still
// evict those subscribers, so many queries will continue to operate.
DEFINE_int32(statestore_update_tcp_timeout_seconds, 300, "(Advanced) The time after "
"which an update RPC to a subscriber will timeout. This setting protects against "
"badly hung machines that are not able to respond to the update RPC in short "
"order.");
DEFINE_int32(statestore_update_catalogd_tcp_timeout_seconds, 3, "(Advanced) The "
"time after which a UpdateCatalogd RPC to a subscriber will timeout. This setting "
"protects against badly hung machines that are not able to respond to the "
"UpdateCatalogd RPC in short order");
// Flags for Statestore HA
// Host and port of peer's StatestoreHaService for Statestore HA.
DEFINE_string(state_store_peer_host, "localhost",
"hostname where peer's StatestoreHaService is running");
DEFINE_int32(state_store_peer_ha_port, 24021,
"port where peer's StatestoreHaService is running");
DEFINE_bool(enable_statestored_ha, false, "Set to true to enable Statestore HA");
DEFINE_bool(statestore_force_active, false, "Set to true to force this statestored "
"instance to take active role. It's used to perform manual fail over for statestore "
"service.");
// Use network address as priority value of statestored instance when designating active
// statestored. The lower network address corresponds to a higher priority.
// This is mainly used in unit-test for predictable results.
DEFINE_bool(use_network_address_as_statestore_priority, false, "Network address is "
"used as priority value of statestored instance if this is set as true. Otherwise, "
"statestored_id which is generated as random number will be used as priority value "
"of statestored instance.");
// Waiting period in ms for HA preemption. It should be set with proper value based on the
// time to take for bringing a statestored instance in-line in the deployment environment.
DEFINE_int64(statestore_ha_preemption_wait_period_ms, 10000, "(Advanced) The time after "
"which statestored designates itself as active role if the statestore does not "
"receive HA handshake request/response from peer statestored.");
DEFINE_double_hidden(statestore_ha_heartbeat_monitoring_frequency_ms, 1000, "(Advanced) "
"Frequency (in ms) with which the statestore monitors HA heartbeats from active "
"statestore.");
DEFINE_int32(statestore_update_statestore_tcp_timeout_seconds, 3, "(Advanced) The "
"time after which a UpdateStatestoredRole RPC to a subscriber will timeout. This "
"setting protects against badly hung machines that are not able to respond to the "
"UpdateStatestoredRole RPC in short order.");
DEFINE_int32(statestore_peer_timeout_seconds, 30, "The amount of time (in seconds) that "
"may elapse before the connection with the peer statestore is considered lost.");
DEFINE_int32(statestore_peer_cnxn_attempts, 10, "The number of times to retry an "
"RPC connection to the peer statestore. A setting of 0 means retry indefinitely");
DEFINE_int32(statestore_peer_cnxn_retry_interval_ms, 1000, "The interval, in ms, "
"to wait between attempts to make an RPC connection to the peer statestore. "
"It's set as statestore_ha_preemption_wait_period_ms/statestore_peer_cnxn_attempts "
"if statestore_peer_cnxn_attempts > 0, default value if "
"statestore_peer_cnxn_attempts == 0.");
DEFINE_int32(statestore_ha_client_rpc_timeout_ms, 300000, "(Advanced) The underlying "
"TSocket send/recv timeout in milliseconds for a client RPC of Statestore HA "
"service.");
DEFINE_int32(statestore_ha_client_rpc_conn_timeout_ms, 0, "(Advanced) The underlying "
"TSocket conn timeout in milliseconds for a client RPC of Statestore HA service.");
DEFINE_int64(update_statestore_rpc_resend_interval_ms, 100, "(Advanced) Interval "
"(in ms) with which the statestore resends the RPCs of updating statestored's role "
"to subscribers if the statestore has failed to send the RPCs to the subscribers.");
DECLARE_string(hostname);
DECLARE_bool(enable_catalogd_ha);
DECLARE_int64(active_catalogd_designation_monitoring_interval_ms);
DECLARE_int64(update_catalogd_rpc_resend_interval_ms);
DECLARE_string(debug_actions);
DECLARE_string(ssl_server_certificate);
DECLARE_string(ssl_private_key);
DECLARE_string(ssl_private_key_password_cmd);
DECLARE_string(ssl_cipher_list);
DECLARE_string(ssl_minimum_version);
#ifndef NDEBUG
DECLARE_int32(stress_statestore_startup_delay_ms);
#endif
// Used to identify the statestore in the failure detector
const string STATESTORE_ID = "STATESTORE";
// Metric keys
// TODO: Replace 'backend' with 'subscriber' when we can coordinate a change with CM
const string STATESTORE_LIVE_SUBSCRIBERS = "statestore.live-backends";
const string STATESTORE_LIVE_SUBSCRIBERS_LIST = "statestore.live-backends.list";
const string STATESTORE_SUBSCRIBERS_RECEIVED_HEARTBEAT =
"statestore.subscribers-received-heartbeat";
const string STATESTORE_TOTAL_KEY_SIZE_BYTES = "statestore.total-key-size-bytes";
const string STATESTORE_TOTAL_VALUE_SIZE_BYTES = "statestore.total-value-size-bytes";
const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-bytes";
const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
const string STATESTORE_PRIORITY_UPDATE_DURATION =
"statestore.priority-topic-update-durations";
const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
const string STATESTORE_SUCCESSFUL_UPDATE_CATALOGD_RPC_NUM =
"statestore.num-successful-update-catalogd-rpc";
const string STATESTORE_FAILED_UPDATE_CATALOGD_RPC_NUM =
"statestore.num-failed-update-catalogd-rpc";
const string STATESTORE_SUCCESSFUL_UPDATE_STATESTORED_ROLE_RPC_NUM =
"statestore.num-successful-update-statestored-role-rpc";
const string STATESTORE_FAILED_UPDATE_STATESTORED_ROLE_RPC_NUM =
"statestore.num-failed-update-statestored-role-rpc";
const string STATESTORE_CLEAR_TOPIC_ENTRIES_NUM =
"statestore.num-clear-topic-entries-requests";
const string STATESTORE_ACTIVE_CATALOGD_ADDRESS = "statestore.active-catalogd-address";
const string STATESTORE_ACTIVE_STATUS = "statestore.active-status";
const string STATESTORE_SERVICE_STARTED = "statestore.service-started";
const string STATESTORE_IN_HA_RECOVERY = "statestore.in-ha-recovery-mode";
const string STATESTORE_CONNECTED_PEER = "statestore.connected-with-peer-statestored";
// Initial version for each Topic registered by a Subscriber. Generally, the Topic will
// have a Version that is the MAX() of all entries in the Topic, but this initial
// value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
// between the case where a Topic is empty and the case where the Topic only contains
// an entry with the initial version.
const Statestore::TopicEntry::Version Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0;
// If statestore instance in active state receives more than 10 heartbeats from its peer,
// enter recovery mode to re-negotiate role with its peer.
// Heartbeat period is set by FALGS_statestore_ha_heartbeat_monitoring_frequency_ms, its
// default value is 1000 ms. That means statestore instance in active state will enter
// recovery mode in 10 seconds if it repeatedly receives heartbeats from its peer.
#define MAX_NUM_RECEIVED_HEARTBEAT_IN_ACTIVE 10
// Updates or heartbeats that miss their deadline by this much are logged.
const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
const char* Statestore::IMPALA_MEMBERSHIP_TOPIC = "impala-membership";
const char* Statestore::IMPALA_REQUEST_QUEUE_TOPIC = "impala-request-queue";
typedef ClientConnection<StatestoreSubscriberClientWrapper> StatestoreSubscriberConn;
typedef ClientConnection<StatestoreHaServiceClientWrapper> StatestoreHaServiceConn;
namespace impala {
std::string SubscriberTypeToString(TStatestoreSubscriberType::type t) {
switch (t) {
case TStatestoreSubscriberType::ADMISSIOND:
return "ADMISSIOND";
case TStatestoreSubscriberType::CATALOGD:
return "CATALOGD";
case TStatestoreSubscriberType::COORDINATOR:
return "COORDINATOR";
case TStatestoreSubscriberType::EXECUTOR:
return "EXECUTOR";
case TStatestoreSubscriberType::COORDINATOR_EXECUTOR:
return "COORDINATOR AND EXECUTOR";
case TStatestoreSubscriberType::UNKNOWN:
default:
return "UNKNOWN";
};
}
}
class StatestoreThriftIf : public StatestoreServiceIf {
public:
StatestoreThriftIf(Statestore* statestore)
: statestore_(statestore) {
DCHECK(statestore_ != NULL);
}
virtual void RegisterSubscriber(TRegisterSubscriberResponse& response,
const TRegisterSubscriberRequest& params) {
if (FLAGS_debug_actions == "START_STATESTORE_IN_PROTOCOL_V1"
&& strncmp(params.subscriber_id.c_str(), "python-test-client", 18) == 0
&& params.protocol_version > StatestoreServiceVersion::V1) {
// Simulate the behaviour of old statestore to throw exception for new version
// of subscribers.
throw apache::thrift::protocol::TProtocolException(
apache::thrift::protocol::TProtocolException::INVALID_DATA, "Invalid data");
}
if (params.protocol_version < statestore_->GetProtocolVersion()) {
// Refuse old version of subscribers
response.__set_protocol_version(statestore_->GetProtocolVersion());
response.__set_statestore_id(statestore_->GetStateStoreId());
Status status = Status(TErrorCode::STATESTORE_INCOMPATIBLE_PROTOCOL,
params.subscriber_id, params.protocol_version + 1,
statestore_->GetProtocolVersion() + 1);
status.ToThrift(&response.status);
return;
}
TStatestoreSubscriberType::type subscriber_type = TStatestoreSubscriberType::UNKNOWN;
if (params.__isset.subscriber_type) {
subscriber_type = params.subscriber_type;
}
bool subscribe_catalogd_change = false;
if (params.__isset.subscribe_catalogd_change) {
subscribe_catalogd_change = params.subscribe_catalogd_change;
}
TCatalogRegistration catalogd_registration;
if (params.__isset.catalogd_registration) {
catalogd_registration = params.catalogd_registration;
catalogd_registration.__set_registration_time(UnixMillis());
}
RegistrationId registration_id;
bool has_active_catalogd;
int64_t active_catalogd_version;
TCatalogRegistration active_catalogd_registration;
Status status = statestore_->RegisterSubscriber(params.subscriber_id,
params.subscriber_location, params.topic_registrations, subscriber_type,
subscribe_catalogd_change, catalogd_registration, ®istration_id,
&has_active_catalogd, &active_catalogd_version, &active_catalogd_registration);
status.ToThrift(&response.status);
response.__set_registration_id(registration_id);
response.__set_statestore_id(statestore_->GetStateStoreId());
response.__set_protocol_version(statestore_->GetProtocolVersion());
bool is_active_statestored = false;
int64_t active_statestored_version =
statestore_->GetActiveVersion(&is_active_statestored);
response.__set_statestore_is_active(is_active_statestored);
response.__set_active_statestored_version(active_statestored_version);
if (is_active_statestored && has_active_catalogd) {
response.__set_catalogd_registration(active_catalogd_registration);
response.__set_catalogd_version(active_catalogd_version);
statestore_->UpdateSubscriberCatalogInfo(params.subscriber_id);
}
}
virtual void GetProtocolVersion(TGetProtocolVersionResponse& response,
const TGetProtocolVersionRequest& params) {
LOG(INFO) << "Subscriber protocol version: " << params.protocol_version;
response.__set_protocol_version(statestore_->GetProtocolVersion());
response.__set_statestore_id(statestore_->GetStateStoreId());
Status status = Status::OK();
status.ToThrift(&response.status);
return;
}
virtual void SetStatestoreDebugAction(TSetStatestoreDebugActionResponse& response,
const TSetStatestoreDebugActionRequest& params) {
if (params.__isset.disable_network) {
bool disable_network = params.disable_network;
LOG(INFO) << (disable_network ? "Disable" : "Enable")
<< " statestored network";
statestore_->SetStatestoreDebugAction(disable_network);
}
Status status = Status::OK();
status.ToThrift(&response.status);
return;
}
private:
Statestore* statestore_;
};
class StatestoreHaThriftIf : public StatestoreHaServiceIf {
public:
StatestoreHaThriftIf(Statestore* statestore) : statestore_(statestore) {
DCHECK(statestore_ != NULL);
}
// Receive HA handshake request from peer statestore instance.
// Each statestore instance start this StatestoreHaService server and each has client
// cache. They use client to negotiate active-standby role with peer. If retry failed
// for 3 times, assume its peer is not started and the instance take the active role.
// Otherwise, designate the role based on the priorities.
virtual void StatestoreHaHandshake(TStatestoreHaHandshakeResponse& response,
const TStatestoreHaHandshakeRequest& params) {
if (params.src_protocol_version < statestore_->GetProtocolVersion()) {
// Refuse old version of statestore
Status status = Status(TErrorCode::STATESTORE_INCOMPATIBLE_PROTOCOL,
statestore_->GetPeerStatestoreHaAddress(), params.src_protocol_version + 1,
statestore_->GetProtocolVersion() + 1);
status.ToThrift(&response.status);
return;
} else if (params.src_statestore_id == statestore_->GetStateStoreId()) {
// Ignore request sent by itself.
return;
}
bool dst_statestore_active = false;
Status status = statestore_->ReceiveHaHandshakeRequest(params.src_statestore_id,
params.src_statestore_address, params.src_force_active, &dst_statestore_active);
status.ToThrift(&response.status);
response.__set_dst_protocol_version(statestore_->GetProtocolVersion());
response.__set_dst_statestore_id(statestore_->GetStateStoreId());
response.__set_dst_statestore_active(dst_statestore_active);
}
// Receive HA heartbeat from peer statestore instance.
virtual void StatestoreHaHeartbeat(TStatestoreHaHeartbeatResponse& response,
const TStatestoreHaHeartbeatRequest& params) {
if (params.protocol_version < statestore_->GetProtocolVersion()) {
// Refuse old version of statestore
Status status =
Status(TErrorCode::STATESTORE_INCOMPATIBLE_PROTOCOL,
statestore_->GetPeerStatestoreHaAddress(), params.protocol_version + 1,
statestore_->GetProtocolVersion() + 1);
status.ToThrift(&response.status);
return;
}
statestore_->HaHeartbeatRequest(params.dst_statestore_id, params.src_statestore_id);
Status status = Status::OK();
status.ToThrift(&response.status);
}
private:
Statestore* statestore_;
};
void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes,
TopicEntry::Version version) {
DCHECK_GT(bytes.size(), 0);
value_ = bytes;
version_ = version;
}
vector<Statestore::TopicEntry::Version> Statestore::Topic::Put(
const std::vector<TTopicItem>& entries) {
vector<Statestore::TopicEntry::Version> versions;
versions.reserve(entries.size());
// Acquire exclusive lock - we are modifying the topic.
lock_guard<shared_mutex> write_lock(lock_);
for (const TTopicItem& entry: entries) {
TopicEntryMap::iterator entry_it = entries_.find(entry.key);
int64_t key_size_delta = 0;
int64_t value_size_delta = 0;
if (entry_it == entries_.end()) {
entry_it = entries_.emplace(entry.key, TopicEntry()).first;
key_size_delta += entry.key.size();
} else {
// Delete the old entry from the version history. There is no need to search the
// version_history because there should only be at most a single entry in the
// history at any given time.
topic_update_log_.erase(entry_it->second.version());
value_size_delta -= entry_it->second.value().size();
}
value_size_delta += entry.value.size();
entry_it->second.SetValue(entry.value, ++last_version_);
entry_it->second.SetDeleted(entry.deleted);
topic_update_log_.emplace(entry_it->second.version(), entry.key);
total_key_size_bytes_ += key_size_delta;
total_value_size_bytes_ += value_size_delta;
DCHECK_GE(total_key_size_bytes_, static_cast<int64_t>(0));
DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
key_size_metric_->Increment(key_size_delta);
value_size_metric_->Increment(value_size_delta);
topic_size_metric_->Increment(key_size_delta + value_size_delta);
versions.push_back(entry_it->second.version());
}
return versions;
}
void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
const Statestore::TopicEntryKey& key) {
// Acquire exclusive lock - we are modifying the topic.
lock_guard<shared_mutex> write_lock(lock_);
TopicEntryMap::iterator entry_it = entries_.find(key);
if (entry_it != entries_.end() && entry_it->second.version() == version) {
// Add a new entry with the the version history for this deletion and remove the old
// entry
topic_update_log_.erase(version);
topic_update_log_.emplace(++last_version_, key);
entry_it->second.SetDeleted(true);
entry_it->second.SetVersion(last_version_);
}
}
void Statestore::Topic::ClearAllEntries() {
lock_guard<shared_mutex> write_lock(lock_);
entries_.clear();
topic_update_log_.clear();
int64_t key_size_metric_val = key_size_metric_->GetValue();
key_size_metric_->SetValue(std::max(static_cast<int64_t>(0),
key_size_metric_val - total_key_size_bytes_));
int64_t value_size_metric_val = value_size_metric_->GetValue();
value_size_metric_->SetValue(std::max(static_cast<int64_t>(0),
value_size_metric_val - total_value_size_bytes_));
int64_t topic_size_metric_val = topic_size_metric_->GetValue();
topic_size_metric_->SetValue(std::max(static_cast<int64_t>(0),
topic_size_metric_val - (total_value_size_bytes_ + total_key_size_bytes_)));
total_value_size_bytes_ = 0;
total_key_size_bytes_ = 0;
}
void Statestore::Topic::BuildDelta(const SubscriberId& subscriber_id,
TopicEntry::Version last_processed_version,
const string& filter_prefix, TTopicDelta* delta) {
// If the subscriber version is > 0, send this update as a delta. Otherwise, this is
// a new subscriber so send them a non-delta update that includes all entries in the
// topic.
delta->is_delta = last_processed_version > Subscriber::TOPIC_INITIAL_VERSION;
delta->__set_from_version(last_processed_version);
{
// Acquire shared lock - we are not modifying the topic.
shared_lock<shared_mutex> read_lock(lock_);
TopicUpdateLog::const_iterator next_update =
topic_update_log_.upper_bound(last_processed_version);
uint64_t topic_size = 0;
for (; next_update != topic_update_log_.end(); ++next_update) {
TopicEntryMap::const_iterator itr = entries_.find(next_update->second);
DCHECK(itr != entries_.end());
const TopicEntry& topic_entry = itr->second;
// Don't send deleted entries for non-delta updates.
if (!delta->is_delta && topic_entry.is_deleted()) {
continue;
}
// Skip any entries that don't match the requested prefix.
if (!HasPrefixString(itr->first, filter_prefix)) continue;
delta->topic_entries.push_back(TTopicItem());
TTopicItem& delta_entry = delta->topic_entries.back();
delta_entry.key = itr->first;
delta_entry.value = topic_entry.value();
delta_entry.deleted = topic_entry.is_deleted();
topic_size += delta_entry.key.size() + delta_entry.value.size();
}
if (!delta->is_delta &&
last_version_ > Subscriber::TOPIC_INITIAL_VERSION) {
VLOG_QUERY << "Preparing initial " << delta->topic_name
<< " topic update for " << subscriber_id << ". Size = "
<< PrettyPrinter::Print(topic_size, TUnit::BYTES);
}
if (topic_update_log_.size() > 0) {
// The largest version for this topic will be the last entry in the version history
// map.
delta->__set_to_version(topic_update_log_.rbegin()->first);
} else {
// There are no updates in the version history
delta->__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
}
}
}
void Statestore::Topic::ToJson(Document* document, Value* topic_json) {
// Acquire shared lock - we are not modifying the topic.
shared_lock<shared_mutex> read_lock(lock_);
Value topic_id(topic_id_.c_str(), document->GetAllocator());
topic_json->AddMember("topic_id", topic_id, document->GetAllocator());
topic_json->AddMember("num_entries",
static_cast<uint64_t>(entries_.size()),
document->GetAllocator());
topic_json->AddMember("version", last_version_, document->GetAllocator());
int64_t key_size = total_key_size_bytes_;
int64_t value_size = total_value_size_bytes_;
Value key_size_json(PrettyPrinter::Print(key_size, TUnit::BYTES).c_str(),
document->GetAllocator());
topic_json->AddMember("key_size", key_size_json, document->GetAllocator());
Value value_size_json(PrettyPrinter::Print(value_size, TUnit::BYTES).c_str(),
document->GetAllocator());
topic_json->AddMember("value_size", value_size_json, document->GetAllocator());
Value total_size_json(
PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
document->GetAllocator());
topic_json->AddMember("total_size", total_size_json, document->GetAllocator());
topic_json->AddMember("key_size_bytes", key_size, document->GetAllocator());
topic_json->AddMember("value_size_bytes", value_size, document->GetAllocator());
topic_json->AddMember("total_size_bytes", key_size + value_size,
document->GetAllocator());
topic_json->AddMember("prioritized", IsPrioritizedTopic(topic_id_),
document->GetAllocator());
}
Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
const RegistrationId& registration_id, const TNetworkAddress& network_address,
const vector<TTopicRegistration>& subscribed_topics,
TStatestoreSubscriberType::type subscriber_type, bool subscribe_catalogd_change)
: subscriber_id_(subscriber_id),
registration_id_(registration_id),
network_address_(network_address),
subscriber_type_(subscriber_type),
subscribe_catalogd_change_(subscribe_catalogd_change) {
LOG(INFO) << "Subscriber '" << subscriber_id_
<< "' with type " << SubscriberTypeToString(subscriber_type_)
<< " registered (registration id: " << PrintId(registration_id_) << ")";
RefreshLastHeartbeatTimestamp(false);
for (const TTopicRegistration& topic : subscribed_topics) {
GetTopicsMapForId(topic.topic_name)
->emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
forward_as_tuple(
topic.is_transient, topic.populate_min_subscriber_topic_version,
topic.filter_prefix));
}
}
bool Statestore::Subscriber::AddTransientEntries(const TopicId& topic_id,
const vector<TTopicItem>& entries,
const vector<TopicEntry::Version>& entry_versions) {
lock_guard<mutex> l(transient_entry_lock_);
DCHECK_EQ(entries.size(), entry_versions.size());
// Only record the update if the topic is transient
Topics* subscribed_topics = GetTopicsMapForId(topic_id);
Topics::iterator topic_it = subscribed_topics->find(topic_id);
DCHECK(topic_it != subscribed_topics->end());
if (topic_it->second.is_transient) {
if (unregistered_) return false;
for (int i = 0; i < entries.size(); ++i) {
topic_it->second.transient_entries_[entries[i].key] = entry_versions[i];
}
}
return true;
}
void Statestore::Subscriber::DeleteAllTransientEntries(TopicMap* global_topics) {
lock_guard<mutex> l(transient_entry_lock_);
for (const Topics* subscribed_topics :
{&priority_subscribed_topics_, &non_priority_subscribed_topics_}) {
for (const auto& topic : *subscribed_topics) {
auto global_topic_it = global_topics->find(topic.first);
DCHECK(global_topic_it != global_topics->end());
for (auto& transient_entry : topic.second.transient_entries_) {
global_topic_it->second.DeleteIfVersionsMatch(transient_entry.second,
transient_entry.first);
}
}
}
unregistered_ = true;
}
int64_t Statestore::Subscriber::NumTransientEntries() {
lock_guard<mutex> l(transient_entry_lock_);
int64_t num_entries = 0;
for (const Topics* subscribed_topics :
{&priority_subscribed_topics_, &non_priority_subscribed_topics_}) {
for (const auto& topic : *subscribed_topics) {
num_entries += topic.second.transient_entries_.size();
}
}
return num_entries;
}
Statestore::TopicEntry::Version Statestore::Subscriber::LastTopicVersionProcessed(
const TopicId& topic_id) const {
const Topics& subscribed_topics = GetTopicsMapForId(topic_id);
Topics::const_iterator itr = subscribed_topics.find(topic_id);
return itr == subscribed_topics.end() ? TOPIC_INITIAL_VERSION
: itr->second.last_version.Load();
}
void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_id,
TopicEntry::Version version) {
// Safe to call concurrently for different topics because 'subscribed_topics' is not
// modified.
Topics* subscribed_topics = GetTopicsMapForId(topic_id);
Topics::iterator topic_it = subscribed_topics->find(topic_id);
// IMPALA-7714: log warning to aid debugging in release builds without DCHECK.
if (UNLIKELY(topic_it == subscribed_topics->end())) {
LOG(ERROR) << "Couldn't find subscribed topic " << topic_id;
}
DCHECK(topic_it != subscribed_topics->end()) << topic_id;
topic_it->second.last_version.Store(version);
}
void Statestore::Subscriber::RefreshLastHeartbeatTimestamp(bool received_heartbeat) {
DCHECK_GE(MonotonicMillis(), last_heartbeat_ts_.Load());
last_heartbeat_ts_.Store(MonotonicMillis());
if (received_heartbeat) received_heartbeat_.Store(true);
}
void Statestore::Subscriber::UpdateCatalogInfo(
int64_t catalogd_version, const TNetworkAddress& catalogd_address) {
catalogd_version_ = catalogd_version;
catalogd_address_ = catalogd_address;
last_update_catalogd_time_ = UnixMillis();
}
Statestore::Statestore(MetricGroup* metrics)
: protocol_version_(StatestoreServiceVersion::V2),
catalog_manager_(FLAGS_enable_catalogd_ha),
subscriber_topic_update_threadpool_("statestore-update",
"subscriber-update-worker",
FLAGS_statestore_num_update_threads,
FLAGS_statestore_max_subscribers,
bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
UpdateKind::TOPIC_UPDATE, _1, _2)),
subscriber_priority_topic_update_threadpool_("statestore-priority-update",
"subscriber-priority-update-worker",
FLAGS_statestore_num_priority_update_threads,
FLAGS_statestore_max_subscribers,
bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
UpdateKind::PRIORITY_TOPIC_UPDATE, _1, _2)),
subscriber_heartbeat_threadpool_("statestore-heartbeat",
"subscriber-heartbeat-worker",
FLAGS_statestore_num_heartbeat_threads,
FLAGS_statestore_max_subscribers,
bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
UpdateKind::HEARTBEAT, _1, _2)),
update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
FLAGS_statestore_update_tcp_timeout_seconds * 1000,
FLAGS_statestore_update_tcp_timeout_seconds * 1000, "",
IsInternalTlsConfigured())),
heartbeat_client_cache_(new StatestoreSubscriberClientCache(1, 0,
FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000,
FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000, "",
IsInternalTlsConfigured())),
update_catalogd_client_cache_(new StatestoreSubscriberClientCache(1, 0,
FLAGS_statestore_update_catalogd_tcp_timeout_seconds * 1000,
FLAGS_statestore_update_catalogd_tcp_timeout_seconds * 1000, "",
IsInternalTlsConfigured())),
thrift_iface_(new StatestoreThriftIf(this)),
ha_thrift_iface_(new StatestoreHaThriftIf(this)),
failure_detector_(new MissedHeartbeatFailureDetector(
FLAGS_statestore_max_missed_heartbeats,
FLAGS_statestore_max_missed_heartbeats / 2)) {
UUIDToTUniqueId(boost::uuids::random_generator()(), &statestore_id_);
LOG(INFO) << "Statestore ID: " << PrintId(statestore_id_);
DCHECK(metrics != NULL);
metrics_ = metrics;
num_subscribers_metric_ = metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0);
subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
num_subscribers_received_heartbeat_metric_ =
metrics->AddGauge(STATESTORE_SUBSCRIBERS_RECEIVED_HEARTBEAT, 0);
key_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0);
value_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0);
topic_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0);
topic_update_duration_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_UPDATE_DURATION);
priority_topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(
metrics, STATESTORE_PRIORITY_UPDATE_DURATION);
heartbeat_duration_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_HEARTBEAT_DURATION);
successful_update_catalogd_rpc_metric_ =
metrics->AddCounter(STATESTORE_SUCCESSFUL_UPDATE_CATALOGD_RPC_NUM, 0);
failed_update_catalogd_rpc_metric_ =
metrics->AddCounter(STATESTORE_FAILED_UPDATE_CATALOGD_RPC_NUM, 0);
successful_update_statestored_role_rpc_metric_ =
metrics->AddCounter(STATESTORE_SUCCESSFUL_UPDATE_STATESTORED_ROLE_RPC_NUM, 0);
failed_update_statestored_role_rpc_metric_ =
metrics->AddCounter(STATESTORE_FAILED_UPDATE_STATESTORED_ROLE_RPC_NUM, 0);
clear_topic_entries_metric_ =
metrics->AddCounter(STATESTORE_CLEAR_TOPIC_ENTRIES_NUM, 0);
active_catalogd_address_metric_ = metrics->AddProperty<string>(
STATESTORE_ACTIVE_CATALOGD_ADDRESS, "");
active_status_metric_ = metrics->AddProperty(STATESTORE_ACTIVE_STATUS, true);
service_started_metric_ = metrics->AddProperty(STATESTORE_SERVICE_STARTED, false);
update_state_client_cache_->InitMetrics(metrics, "subscriber-update-state");
heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
update_catalogd_client_cache_->InitMetrics(
metrics, "subscriber-update-catalogd");
if (!FLAGS_enable_statestored_ha) {
is_active_ = true;
active_status_metric_->SetValue(is_active_);
active_version_ = UnixMicros();
num_received_heartbeat_in_active_ = 0;
} else {
is_active_ = false;
active_status_metric_->SetValue(is_active_);
active_version_ = 0;
update_statestored_client_cache_.reset(new StatestoreSubscriberClientCache(
1, 0, FLAGS_statestore_update_statestore_tcp_timeout_seconds * 1000,
FLAGS_statestore_update_statestore_tcp_timeout_seconds * 1000, "",
IsInternalTlsConfigured()));
update_statestored_client_cache_->InitMetrics(
metrics, "subscriber-update-statestored");
ha_client_cache_.reset(new StatestoreHaClientCache(1, 0,
FLAGS_statestore_ha_client_rpc_timeout_ms,
FLAGS_statestore_ha_client_rpc_timeout_ms, "",
IsInternalTlsConfigured(), FLAGS_statestore_ha_client_rpc_conn_timeout_ms));
ha_client_cache_->InitMetrics(metrics, "statestored-ha");
ha_standby_ss_failure_detector_.reset(new MissedHeartbeatFailureDetector(
FLAGS_statestore_max_missed_heartbeats,
FLAGS_statestore_max_missed_heartbeats / 2));
ha_active_ss_failure_detector_.reset(new TimeoutFailureDetector(
seconds(FLAGS_statestore_peer_timeout_seconds),
seconds(FLAGS_statestore_peer_timeout_seconds / 2)));
in_ha_recovery_mode_metric_ = metrics->AddProperty(STATESTORE_IN_HA_RECOVERY, false);
connected_peer_metric_ = metrics->AddProperty(STATESTORE_CONNECTED_PEER, false);
}
}
Statestore::~Statestore() {
CHECK(service_started_) << "Cannot shutdown Statestore once initialized and started.";
}
Status Statestore::Init(int32_t state_store_port) {
#ifndef NDEBUG
if (FLAGS_stress_statestore_startup_delay_ms > 0) {
LOG(INFO) << "Stress statestore startup delay: "
<< FLAGS_stress_statestore_startup_delay_ms << " ms";
SleepForMs(FLAGS_stress_statestore_startup_delay_ms);
}
#endif
std::shared_ptr<TProcessor> processor(new StatestoreServiceProcessor(thrift_iface()));
std::shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("statestore", metrics_));
processor->setEventHandler(event_handler);
ThriftServerBuilder builder("StatestoreService", processor, state_store_port);
// Mark this as an internal service to use a more permissive Thrift max message size
builder.is_external_facing(false);
if (IsInternalTlsConfigured()) {
SSLProtocol ssl_version;
RETURN_IF_ERROR(
SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
LOG(INFO) << "Enabling SSL for Statestore";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init());
RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
ThriftServer* server;
RETURN_IF_ERROR(builder.metrics(metrics_).Build(&server));
thrift_server_.reset(server);
RETURN_IF_ERROR(thrift_server_->Start());
RETURN_IF_ERROR(Thread::Create("statestore-heartbeat", "heartbeat-monitoring-thread",
&Statestore::MonitorSubscriberHeartbeat, this, &heartbeat_monitoring_thread_));
RETURN_IF_ERROR(Thread::Create("statestore-update-catalogd", "update-catalogd-thread",
&Statestore::MonitorUpdateCatalogd, this, &update_catalogd_thread_));
service_started_ = true;
service_started_metric_->SetValue(true);
return Status::OK();
}
void Statestore::RegisterWebpages(Webserver* webserver, bool metrics_only) {
Webserver::RawUrlCallback healthz_callback =
[this](const auto& req, auto* data, auto* response) {
return this->HealthzHandler(req, data, response);
};
webserver->RegisterUrlCallback("/healthz", healthz_callback);
if (metrics_only) return;
Webserver::UrlCallback topics_callback =
bind<void>(mem_fn(&Statestore::TopicsHandler), this, _1, _2);
webserver->RegisterUrlCallback("/topics", "statestore_topics.tmpl",
topics_callback, true);
Webserver::UrlCallback subscribers_callback =
bind<void>(&Statestore::SubscribersHandler, this, _1, _2);
webserver->RegisterUrlCallback("/subscribers", "statestore_subscribers.tmpl",
subscribers_callback, true);
if (FLAGS_enable_catalogd_ha) {
Webserver::UrlCallback show_catalog_ha_callback =
bind<void>(&Statestore::CatalogHAInfoHandler, this, _1, _2);
webserver->RegisterUrlCallback(
"/catalog_ha_info", "catalog_ha_info.tmpl", show_catalog_ha_callback, true);
}
RegisterLogLevelCallbacks(webserver, false);
}
void Statestore::TopicsHandler(const Webserver::WebRequest& req,
Document* document) {
lock_guard<mutex> l(subscribers_lock_);
shared_lock<shared_mutex> t(topics_map_lock_);
Value topics(kArrayType);
for (TopicMap::value_type& topic: topics_) {
Value topic_json(kObjectType);
topic.second.ToJson(document, &topic_json);
SubscriberId oldest_subscriber_id;
TopicEntry::Version oldest_subscriber_version =
GetMinSubscriberTopicVersion(topic.first, &oldest_subscriber_id);
topic_json.AddMember("oldest_version", oldest_subscriber_version,
document->GetAllocator());
Value oldest_id(oldest_subscriber_id.c_str(), document->GetAllocator());
topic_json.AddMember("oldest_id", oldest_id, document->GetAllocator());
topics.PushBack(topic_json, document->GetAllocator());
}
document->AddMember("topics", topics, document->GetAllocator());
}
void Statestore::SubscribersHandler(const Webserver::WebRequest& req,
Document* document) {
lock_guard<mutex> l(subscribers_lock_);
Value subscribers(kArrayType);
for (const SubscriberMap::value_type& subscriber: subscribers_) {
Value sub_json(kObjectType);
Value subscriber_id(subscriber.second->id().c_str(), document->GetAllocator());
sub_json.AddMember("id", subscriber_id, document->GetAllocator());
Value address(TNetworkAddressToString(subscriber.second->network_address()).c_str(),
document->GetAllocator());
sub_json.AddMember("address", address, document->GetAllocator());
int64_t num_priority_topics =
subscriber.second->priority_subscribed_topics().size();
int64_t num_non_priority_topics =
subscriber.second->non_priority_subscribed_topics().size();
sub_json.AddMember("num_topics",
static_cast<uint64_t>(num_priority_topics + num_non_priority_topics),
document->GetAllocator());
sub_json.AddMember("num_priority_topics",
static_cast<uint64_t>(num_priority_topics),
document->GetAllocator());
sub_json.AddMember("num_transient",
static_cast<uint64_t>(subscriber.second->NumTransientEntries()),
document->GetAllocator());
Value registration_id(PrintId(subscriber.second->registration_id()).c_str(),
document->GetAllocator());
sub_json.AddMember("registration_id", registration_id, document->GetAllocator());
Value secs_since_heartbeat(
StringPrintf("%.3f", subscriber.second->SecondsSinceHeartbeat()).c_str(),
document->GetAllocator());
sub_json.AddMember(
"secs_since_heartbeat", secs_since_heartbeat, document->GetAllocator());
subscribers.PushBack(sub_json, document->GetAllocator());
}
document->AddMember("subscribers", subscribers, document->GetAllocator());
}
Status Statestore::OfferUpdate(const ScheduledSubscriberUpdate& update,
ThreadPool<ScheduledSubscriberUpdate>* threadpool) {
// Somewhat confusingly, we're checking the number of entries in a particular
// threadpool's work queue to decide whether or not we have too many
// subscribers. The number of subscribers registered can be actually more
// than statestore_max_subscribers. This is because RegisterSubscriber() adds
// the new subscriber to subscribers_ first before scheduling its updates.
// Should we be stricter in enforcing this limit on subscribers_.size() itself?
if (threadpool->GetQueueSize() >= FLAGS_statestore_max_subscribers
|| !threadpool->Offer(update)) {
stringstream ss;
ss << "Maximum subscriber limit reached: " << FLAGS_statestore_max_subscribers;
ss << ", subscribers_ size: " << subscribers_.size();
SubscriberMap::iterator subscriber_it = subscribers_.find(update.subscriber_id);
DCHECK(subscriber_it != subscribers_.end());
subscribers_.erase(subscriber_it);
if (FLAGS_enable_statestored_ha) {
ActiveConnStateMap::iterator conn_states_it =
active_conn_states_.find(update.subscriber_id);
if (conn_states_it != active_conn_states_.end()) {
if (conn_states_it->second == TStatestoreConnState::FAILED) {
--failed_conn_state_count_;
}
active_conn_states_.erase(conn_states_it);
}
}
LOG(ERROR) << ss.str();
return Status(ss.str());
}
return Status::OK();
}
Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
const TNetworkAddress& location,
const vector<TTopicRegistration>& topic_registrations,
TStatestoreSubscriberType::type subscriber_type,
bool subscribe_catalogd_change,
const TCatalogRegistration& catalogd_registration,
RegistrationId* registration_id,
bool* has_active_catalogd,
int64_t* active_catalogd_version,
TCatalogRegistration* active_catalogd_registration) {
bool is_catalogd = subscriber_type == TStatestoreSubscriberType::CATALOGD;
if (subscriber_id.empty()) {
return Status("Subscriber ID cannot be empty string");
} else if (is_catalogd
&& FLAGS_enable_catalogd_ha != catalogd_registration.enable_catalogd_ha) {
return Status("CalaogD HA enabling flag from catalogd does not match.");
} else if (disable_network_.Load()) {
return Status("Reject registration since network is disabled.");
}
// Create any new topics first, so that when the subscriber is first sent a topic update
// by the worker threads its topics are guaranteed to exist.
{
// Start with a shared read lock when checking the map. In the common case the topic
// will already exist, so we don't need to immediately get the exclusive lock and
// block other threads.
upgrade_lock<shared_mutex> topic_read_lock(topics_map_lock_);
for (const TTopicRegistration& topic: topic_registrations) {
TopicMap::iterator topic_it = topics_.find(topic.topic_name);
if (topic_it == topics_.end()) {
// Upgrade to an exclusive lock when modifying the map.
upgrade_to_unique_lock<shared_mutex> topic_write_lock(topic_read_lock);
LOG(INFO) << "Creating new topic: ''" << topic.topic_name
<< "' on behalf of subscriber: '" << subscriber_id;
topics_.emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
forward_as_tuple(topic.topic_name, key_size_metric_, value_size_metric_,
topic_size_metric_));
}
}
}
LOG(INFO) << "Registering: " << subscriber_id;
bool is_reregistering = false;
{
lock_guard<mutex> l(subscribers_lock_);
UUIDToTUniqueId(subscriber_uuid_generator_(), registration_id);
SubscriberMap::iterator subscriber_it = subscribers_.find(subscriber_id);
if (subscriber_it != subscribers_.end()) {
shared_ptr<Subscriber> subscriber = subscriber_it->second;
UnregisterSubscriber(subscriber.get());
// Check if the subscriber's network addresses are matching.
if (subscriber->network_address().hostname != location.hostname
|| subscriber->network_address().port != location.port) {
LOG(INFO) << "Subscriber " << subscriber_id
<< " re-register with different address, old address: "
<< TNetworkAddressToString(subscriber->network_address())
<< " , new address: " << TNetworkAddressToString(location);
}
is_reregistering = true;
}
if (is_catalogd
&& catalog_manager_.RegisterCatalogd(is_reregistering, subscriber_id,
*registration_id, catalogd_registration)) {
LOG(INFO) << "Active catalogd role is designated to "
<< catalog_manager_.GetActiveCatalogdSubscriberId();
update_catalod_cv_.NotifyAll();
}
shared_ptr<Subscriber> current_registration(new Subscriber(
subscriber_id, *registration_id, location, topic_registrations,
subscriber_type, subscribe_catalogd_change));
subscribers_.emplace(subscriber_id, current_registration);
if (FLAGS_enable_statestored_ha) {
active_conn_states_.emplace(subscriber_id, TStatestoreConnState::OK);
}
failure_detector_->UpdateHeartbeat(subscriber_id, true);
num_subscribers_metric_->SetValue(subscribers_.size());
subscriber_set_metric_->Add(subscriber_id);
// Add the subscriber to the update queue, with an immediate schedule.
ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
RETURN_IF_ERROR(OfferUpdate(update, &subscriber_priority_topic_update_threadpool_));
RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
*active_catalogd_registration =
catalog_manager_.GetActiveCatalogRegistration(
has_active_catalogd, active_catalogd_version);
}
return Status::OK();
}
void Statestore::SetStatestoreDebugAction(bool disable_network) {
if (FLAGS_debug_actions == "DISABLE_STATESTORE_NETWORK") {
disable_network_.Store(disable_network);
}
}
bool Statestore::FindSubscriber(const SubscriberId& subscriber_id,
const RegistrationId& registration_id, shared_ptr<Subscriber>* subscriber) {
DCHECK(subscriber != nullptr);
lock_guard<mutex> l(subscribers_lock_);
SubscriberMap::iterator it = subscribers_.find(subscriber_id);
if (it == subscribers_.end() ||
it->second->registration_id() != registration_id) return false;
*subscriber = it->second;
return true;
}
Status Statestore::SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kind,
bool* update_skipped) {
if (!IsActive()) {
// Don't send topic update if the statestored is not active.
return Status::OK();
} else if (FLAGS_enable_catalogd_ha && subscriber->IsCatalogd()
&& !catalog_manager_.IsActiveCatalogd(subscriber->id())) {
// Don't send topic update to inactive catalogd.
VLOG(3) << "Skip sending topic update to inactive catalogd";
return Status::OK();
}
// Time any successful RPCs (i.e. those for which UpdateState() completed, even though
// it may have returned an error.)
MonotonicStopWatch sw;
sw.Start();
// First thing: make a list of updates to send
TUpdateStateRequest update_state_request;
GatherTopicUpdates(*subscriber, update_kind, &update_state_request);
// 'subscriber' may not be subscribed to any updates of 'update_kind'.
if (update_state_request.topic_deltas.empty()) {
*update_skipped = false;
return Status::OK();
}
// Set the expected registration ID, so that the subscriber can reject this update if
// they have moved on to a new registration instance.
update_state_request.__set_registration_id(subscriber->registration_id());
update_state_request.__set_statestore_id(statestore_id_);
// Second: try and send it
Status status;
StatestoreSubscriberConn client(update_state_client_cache_.get(),
subscriber->network_address(), &status);
RETURN_IF_ERROR(status);
TUpdateStateResponse response;
RETURN_IF_ERROR(client.DoRpc(
&StatestoreSubscriberClientWrapper::UpdateState, update_state_request, &response));
StatsMetric<double>* update_duration_metric =
update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE ?
priority_topic_update_duration_metric_ : topic_update_duration_metric_;
status = Status(response.status);
if (!status.ok()) {
update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
return status;
}
*update_skipped = (response.__isset.skipped && response.skipped);
if (*update_skipped) {
// The subscriber skipped processing this update. We don't consider this a failure
// - subscribers can decide what they do with any update - so, return OK and set
// update_skipped so the caller can compensate.
update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
return Status::OK();
}
// At this point the updates are assumed to have been successfully processed by the
// subscriber. Update the subscriber's max version of each topic.
for (const auto& topic_delta: update_state_request.topic_deltas) {
subscriber->SetLastTopicVersionProcessed(topic_delta.first,
topic_delta.second.to_version);
}
// Thirdly: perform any / all updates returned by the subscriber
{
shared_lock<shared_mutex> l(topics_map_lock_);
for (const TTopicDelta& update: response.topic_updates) {
TopicMap::iterator topic_it = topics_.find(update.topic_name);
if (topic_it == topics_.end()) {
VLOG(1) << "Received update for unexpected topic:" << update.topic_name;
continue;
}
VLOG_RPC << "Received update for topic " << update.topic_name
<< " from " << subscriber->id() << ", number of entries: "
<< update.topic_entries.size();
// The subscriber sent back their from_version which indicates that they want to
// reset their max version for this topic to this value. The next update sent will
// be from this version.
if (update.__isset.from_version) {
LOG(INFO) << "Received request for different delta base of topic: "
<< update.topic_name << " from: " << subscriber->id()
<< " subscriber from_version: " << update.from_version;
subscriber->SetLastTopicVersionProcessed(topic_it->first, update.from_version);
}
Topic& topic = topic_it->second;
// Check if the subscriber indicated that the topic entries should be
// cleared.
if (update.__isset.clear_topic_entries && update.clear_topic_entries) {
DCHECK(!update.__isset.from_version);
LOG(INFO) << "Received request for clearing the entries of topic: "
<< update.topic_name << " from: " << subscriber->id();
clear_topic_entries_metric_->Increment(1);
topic.ClearAllEntries();
}
// Update the topic and add transient entries separately to avoid holding both
// locks at the same time and preventing concurrent topic updates.
vector<TopicEntry::Version> entry_versions = topic.Put(update.topic_entries);
if (!subscriber->AddTransientEntries(
update.topic_name, update.topic_entries, entry_versions)) {
// Subscriber was unregistered - clean up the transient entries.
for (int i = 0; i < update.topic_entries.size(); ++i) {
topic.DeleteIfVersionsMatch(entry_versions[i], update.topic_entries[i].key);
}
}
}
}
update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
return Status::OK();
}
void Statestore::GatherTopicUpdates(const Subscriber& subscriber, UpdateKind update_kind,
TUpdateStateRequest* update_state_request) {
DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
|| update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE)
<< static_cast<int>(update_kind);
// Indices into update_state_request->topic_deltas where we need to populate
// 'min_subscriber_topic_version'. GetMinSubscriberTopicVersion() is somewhat
// expensive so we want to avoid calling it unless necessary.
vector<TTopicDelta*> deltas_needing_min_version;
{
const bool is_priority = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE;
const Subscriber::Topics& subscribed_topics = is_priority ?
subscriber.priority_subscribed_topics() :
subscriber.non_priority_subscribed_topics();
shared_lock<shared_mutex> l(topics_map_lock_);
for (const auto& subscribed_topic : subscribed_topics) {
auto topic_it = topics_.find(subscribed_topic.first);
DCHECK(topic_it != topics_.end());
TopicEntry::Version last_processed_version =
subscriber.LastTopicVersionProcessed(topic_it->first);
TTopicDelta& topic_delta =
update_state_request->topic_deltas[subscribed_topic.first];
topic_delta.topic_name = subscribed_topic.first;
topic_it->second.BuildDelta(subscriber.id(), last_processed_version,
subscribed_topic.second.filter_prefix, &topic_delta);
if (subscribed_topic.second.populate_min_subscriber_topic_version) {
deltas_needing_min_version.push_back(&topic_delta);
}
}
}
// Fill in the min subscriber topic version. This must be done after releasing
// topics_map_lock_.
if (!deltas_needing_min_version.empty()) {
lock_guard<mutex> l(subscribers_lock_);
for (TTopicDelta* delta : deltas_needing_min_version) {
delta->__set_min_subscriber_topic_version(
GetMinSubscriberTopicVersion(delta->topic_name));
}
}
}
Statestore::TopicEntry::Version Statestore::GetMinSubscriberTopicVersion(
const TopicId& topic_id, SubscriberId* subscriber_id) {
TopicEntry::Version min_topic_version = numeric_limits<int64_t>::max();
bool found = false;
// Find the minimum version processed for this topic across all topic subscribers.
for (const SubscriberMap::value_type& subscriber: subscribers_) {
if (FLAGS_enable_catalogd_ha && subscriber.second->IsCatalogd()
&& !catalog_manager_.IsActiveCatalogd(subscriber.second->id())) {
// Skip inactive catalogd since it does not apply catalog updates from the active
// catalogd.
continue;
}
auto subscribed_topics = subscriber.second->GetTopicsMapForId(topic_id);
if (subscribed_topics->find(topic_id) != subscribed_topics->end()) {
found = true;
TopicEntry::Version last_processed_version =
subscriber.second->LastTopicVersionProcessed(topic_id);
if (last_processed_version < min_topic_version) {
min_topic_version = last_processed_version;
if (subscriber_id != NULL) *subscriber_id = subscriber.second->id();
}
}
}
return found ? min_topic_version : Subscriber::TOPIC_INITIAL_VERSION;
}
bool Statestore::IsPrioritizedTopic(const string& topic) {
return ends_with(topic, IMPALA_MEMBERSHIP_TOPIC)
|| ends_with(topic, IMPALA_REQUEST_QUEUE_TOPIC);
}
const char* Statestore::GetUpdateKindName(UpdateKind kind) {
switch (kind) {
case UpdateKind::TOPIC_UPDATE:
return "topic update";
case UpdateKind::PRIORITY_TOPIC_UPDATE:
return "priority topic update";
case UpdateKind::HEARTBEAT:
return "heartbeat";
}
DCHECK(false);
return "";
}
ThreadPool<Statestore::ScheduledSubscriberUpdate>* Statestore::GetThreadPool(
UpdateKind kind) {
switch (kind) {
case UpdateKind::TOPIC_UPDATE:
return &subscriber_topic_update_threadpool_;
case UpdateKind::PRIORITY_TOPIC_UPDATE:
return &subscriber_priority_topic_update_threadpool_;
case UpdateKind::HEARTBEAT:
return &subscriber_heartbeat_threadpool_;
}
DCHECK(false);
return nullptr;
}
Status Statestore::SendHeartbeat(Subscriber* subscriber) {
if (disable_network_.Load()) {
return Status("Don't send heartbeat since network is disabled.");
}
MonotonicStopWatch sw;
sw.Start();
Status status;
StatestoreSubscriberConn client(heartbeat_client_cache_.get(),
subscriber->network_address(), &status);
RETURN_IF_ERROR(status);
THeartbeatRequest request;
THeartbeatResponse response;
request.__set_registration_id(subscriber->registration_id());
request.__set_statestore_id(statestore_id_);
if (FLAGS_enable_statestored_ha && !IsActive()) {
// Send heartbeat to subscriber with request for connection state between active
// statestore and subscriber.
request.__set_request_statestore_conn_state(true);
}
RETURN_IF_ERROR(
client.DoRpc(&StatestoreSubscriberClientWrapper::Heartbeat, request, &response));
heartbeat_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
if (FLAGS_enable_statestored_ha && !IsActive()
&& response.__isset.statestore_conn_state) {
TStatestoreConnState::type conn_state = response.statestore_conn_state;
lock_guard<mutex> l(subscribers_lock_);
if (active_conn_states_.find(subscriber->id()) != active_conn_states_.end()
&& active_conn_states_[subscriber->id()] != conn_state) {
if (conn_state == TStatestoreConnState::FAILED) {
DCHECK(active_conn_states_[subscriber->id()] == TStatestoreConnState::OK
|| active_conn_states_[subscriber->id()] == TStatestoreConnState::UNKNOWN);
++failed_conn_state_count_;
} else if (active_conn_states_[subscriber->id()] == TStatestoreConnState::FAILED) {
DCHECK(conn_state == TStatestoreConnState::OK
|| conn_state == TStatestoreConnState::UNKNOWN);
--failed_conn_state_count_;
} else {
DCHECK((active_conn_states_[subscriber->id()] == TStatestoreConnState::OK &&
conn_state == TStatestoreConnState::UNKNOWN)
|| (conn_state == TStatestoreConnState::OK &&
active_conn_states_[subscriber->id()] == TStatestoreConnState::UNKNOWN));
}
// Save the connection state between active statestore and subscriber.
active_conn_states_[subscriber->id()] = conn_state;
}
}
return Status::OK();
}
void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
const ScheduledSubscriberUpdate& update) {
const bool is_heartbeat = update_kind == UpdateKind::HEARTBEAT;
int64_t update_deadline = update.deadline;
shared_ptr<Subscriber> subscriber;
// Check if the subscriber has re-registered, in which case we can ignore
// this scheduled update.
if (!FindSubscriber(update.subscriber_id, update.registration_id, &subscriber)) {
return;
}
const char* update_kind_str = GetUpdateKindName(update_kind);
if (update_deadline != 0) {
// Wait until deadline.
int64_t diff_ms = update_deadline - UnixMillis();
while (diff_ms > 0) {
SleepForMs(diff_ms);
diff_ms = update_deadline - UnixMillis();
}
// The subscriber can potentially reconnect by the time this thread wakes
// up. In such case, we can ignore this update.
if (UNLIKELY(!FindSubscriber(
subscriber->id(), subscriber->registration_id(), &subscriber))) {
return;
}
diff_ms = std::abs(diff_ms);
VLOG(3) << "Sending " << update_kind_str << " message to: " << update.subscriber_id
<< " (deadline accuracy: " << diff_ms << "ms)";
if (diff_ms > DEADLINE_MISS_THRESHOLD_MS && is_heartbeat) {
const string& msg = Substitute(
"Missed subscriber ($0) $1 deadline by $2ms, "
"consider increasing --statestore_heartbeat_frequency_ms (currently $3) on "
"this Statestore, and --statestore_subscriber_timeout_seconds "
"on subscribers (Impala daemons and the Catalog Server)",
update.subscriber_id, update_kind_str, diff_ms,
FLAGS_statestore_heartbeat_frequency_ms);
LOG(WARNING) << msg;
}
// Don't warn for topic updates - they can be slow and still correct. Recommending an
// increase in update period will just confuse (as will increasing the thread pool
// size) because it's hard for users to pick a good value, they may still see these
// messages and it won't be a correctness problem.
} else {
// The first update is scheduled immediately and has a deadline of 0. There's no need
// to wait.
VLOG(3) << "Initial " << update_kind_str << " message for: " << update.subscriber_id;
}
DebugActionNoFail(FLAGS_debug_actions, "DO_SUBSCRIBER_UPDATE");
// Send the right message type, and compute the next deadline
int64_t deadline_ms = 0;
Status status;
if (is_heartbeat) {
status = SendHeartbeat(subscriber.get());
if (status.ok()) {
subscriber->RefreshLastHeartbeatTimestamp(true);
} else if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
// Add details to status to make it more useful, while preserving the stack
status.AddDetail(Substitute(
"Subscriber $0 timed-out during heartbeat RPC. Timeout is $1s.",
subscriber->id(), FLAGS_statestore_heartbeat_tcp_timeout_seconds));
}
deadline_ms = UnixMillis() + FLAGS_statestore_heartbeat_frequency_ms;
} else {
// Initialize to false so that we don't consider the update skipped when
// SendTopicUpdate() fails.
bool update_skipped = false;
status = SendTopicUpdate(subscriber.get(), update_kind, &update_skipped);
if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
// Add details to status to make it more useful, while preserving the stack
status.AddDetail(Substitute(
"Subscriber $0 timed-out during topic-update RPC. Timeout is $1s.",
subscriber->id(), FLAGS_statestore_update_tcp_timeout_seconds));
}
// If the subscriber responded that it skipped a topic in the last update sent,
// we assume that it was busy doing something else, and back off slightly before
// sending another.
int64_t update_frequency = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE
? FLAGS_statestore_priority_update_frequency_ms
: FLAGS_statestore_update_frequency_ms;
int64_t update_interval = update_skipped ? (2 * update_frequency)
: update_frequency;
deadline_ms = UnixMillis() + update_interval;
}
{
lock_guard<mutex> l(subscribers_lock_);
// Check again if this registration has been removed while we were processing the
// message.
SubscriberMap::iterator it = subscribers_.find(update.subscriber_id);
if (it == subscribers_.end() ||
it->second->registration_id() != update.registration_id) return;
if (!status.ok()) {
LOG(INFO) << "Unable to send " << update_kind_str << " message to subscriber "
<< update.subscriber_id << ", received error: " << status.GetDetail();
}
FailureDetector::PeerState state = is_heartbeat ?
failure_detector_->UpdateHeartbeat(update.subscriber_id, status.ok()) :
failure_detector_->GetPeerState(update.subscriber_id);
if (state == FailureDetector::FAILED) {
if (is_heartbeat) {
// TODO: Consider if a metric to track the number of failures would be useful.
LOG(INFO) << "Subscriber '" << subscriber->id() << "' has failed, disconnected "
<< "or re-registered (last known registration ID: "
<< PrintId(update.registration_id) << ")";
UnregisterSubscriber(subscriber.get());
if (subscriber->IsCatalogd()) {
if (catalog_manager_.UnregisterCatalogd(subscriber->id())) {
update_catalod_cv_.NotifyAll();
}
}
} else {
LOG(INFO) << "Failure was already detected for subscriber '" << subscriber->id()
<< "'. Won't send another " << update_kind_str;
}
} else {
// Schedule the next message.
VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " deadline for: "
<< subscriber->id() << " is in " << deadline_ms << "ms";
status = OfferUpdate(ScheduledSubscriberUpdate(deadline_ms, subscriber->id(),
subscriber->registration_id()), GetThreadPool(update_kind));
if (!status.ok()) {
LOG(INFO) << "Unable to send next " << update_kind_str
<< " message to subscriber '" << subscriber->id() << "': "
<< status.GetDetail();
}
}
}
}
[[noreturn]] void Statestore::MonitorSubscriberHeartbeat() {
while (1) {
int num_subscribers;
int num_subscribers_received_heartbeat = 0;
vector<SubscriberId> inactive_subscribers;
SleepForMs(FLAGS_heartbeat_monitoring_frequency_ms);
{
lock_guard<mutex> l(subscribers_lock_);
num_subscribers = subscribers_.size();
for (const auto& subscriber : subscribers_) {
if (subscriber.second->MilliSecondsSinceHeartbeat()
> FLAGS_heartbeat_monitoring_frequency_ms) {
inactive_subscribers.push_back(subscriber.second->id());
} else if (subscriber.second->receivedHeartbeat()) {
num_subscribers_received_heartbeat++;
}
}
}
num_subscribers_received_heartbeat_metric_->SetValue(
num_subscribers_received_heartbeat);
if (inactive_subscribers.empty()) {
LOG(INFO) << num_subscribers_received_heartbeat << "/" << num_subscribers
<< " subscribers successfully heartbeat in the last "
<< FLAGS_heartbeat_monitoring_frequency_ms << "ms.";
} else {
LOG(INFO) << num_subscribers_received_heartbeat << "/" << num_subscribers
<< " subscribers successfully heartbeat in the last "
<< FLAGS_heartbeat_monitoring_frequency_ms << "ms."
<< " Slow subscribers: " << boost::join(inactive_subscribers, ", ");
}
}
}
[[noreturn]] void Statestore::MonitorUpdateCatalogd() {
int64_t last_active_catalogd_version = 0;
// rpc_receivers is used to track subscribers to which statestore need to send RPCs
// when there is a change in the elected active catalogd. It is updated from
// subscribers_, and the subscribers will be removed from this list if the RPCs are
// successfully sent to them.
vector<std::shared_ptr<Subscriber>> rpc_receivers;
int64_t timeout_us =
FLAGS_active_catalogd_designation_monitoring_interval_ms * MICROS_PER_MILLI;
// Check if the first registered one should be designated with active role.
while (!catalog_manager_.CheckActiveCatalog()) {
unique_lock<mutex> l(*catalog_manager_.GetLock());
update_catalod_cv_.WaitFor(l, timeout_us);
}
SendUpdateCatalogdNotification(&last_active_catalogd_version, rpc_receivers);
// Wait for notification. If new leader is elected due to catalogd is registered or
// unregistered, send notification to all coordinators and catalogds.
timeout_us = FLAGS_update_catalogd_rpc_resend_interval_ms * MICROS_PER_MILLI;
while (1) {
{
unique_lock<mutex> l(*catalog_manager_.GetLock());
update_catalod_cv_.WaitFor(l, timeout_us);
}
SendUpdateCatalogdNotification(&last_active_catalogd_version, rpc_receivers);
}
}
void Statestore::SendUpdateCatalogdNotification(int64_t* last_active_catalogd_version,
vector<std::shared_ptr<Subscriber>>& rpc_receivers) {
// Don't send UpdateCatalogd RPC if the statestore is not active.
if (!IsActive()) return;
bool has_active_catalogd;
int64_t active_catalogd_version = 0;
TCatalogRegistration catalogd_registration =
catalog_manager_.GetActiveCatalogRegistration(
&has_active_catalogd, &active_catalogd_version);
if (!has_active_catalogd ||
(active_catalogd_version == *last_active_catalogd_version
&& rpc_receivers.empty())) {
// Don't resend RPCs if there is no change in Active Catalogd and no RPC failure in
// last round.
return;
}
bool resend_rpc = false;
if (active_catalogd_version > *last_active_catalogd_version) {
// Send notification for the latest elected active catalogd.
LOG(INFO) << "Send notification for active catalogd version: "
<< active_catalogd_version;
active_catalogd_address_metric_->SetValue(
TNetworkAddressToString(catalogd_registration.address));
rpc_receivers.clear();
{
lock_guard<mutex> l(subscribers_lock_);
for (const auto& subscriber : subscribers_) {
if (subscriber.second->IsSubscribedCatalogdChange()) {
rpc_receivers.push_back(subscriber.second);
}
}
}
*last_active_catalogd_version = active_catalogd_version;
} else {
DCHECK(!rpc_receivers.empty());
lock_guard<mutex> l(subscribers_lock_);
for (std::vector<std::shared_ptr<Subscriber>>::iterator it = rpc_receivers.begin();
it != rpc_receivers.end();) {
// Don't resend RPC to subscribers which have been removed from subscriber list.
std::shared_ptr<Subscriber> subscriber = *it;
if (subscribers_.find(subscriber->id()) == subscribers_.end()) {
it = rpc_receivers.erase(it);
} else {
++it;
}
}
if (rpc_receivers.empty()) return;
resend_rpc = true;
}
for (std::vector<std::shared_ptr<Subscriber>>::iterator it = rpc_receivers.begin();
it != rpc_receivers.end();) {
std::shared_ptr<Subscriber> subscriber = *it;
bool update_skipped = false;
Status status;
if (!resend_rpc) {
status = DebugAction(
FLAGS_debug_actions, "SEND_UPDATE_CATALOGD_RPC_FIRST_ATTEMPT");
}
if (status.ok()) {
StatestoreSubscriberConn client(update_catalogd_client_cache_.get(),
subscriber->network_address(), &status);
if (status.ok()) {
TUpdateCatalogdRequest request;
TUpdateCatalogdResponse response;
request.__set_registration_id(subscriber->registration_id());
request.__set_statestore_id(statestore_id_);
request.__set_catalogd_version(active_catalogd_version);
request.__set_catalogd_registration(catalogd_registration);
status = client.DoRpc(
&StatestoreSubscriberClientWrapper::UpdateCatalogd, request, &response);
if (!status.ok()) {
if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
// Add details to status to make it more useful, while preserving the stack
status.AddDetail(Substitute(
"Subscriber $0 timed-out during update catalogd RPC. Timeout is $1s.",
subscriber->id(), FLAGS_statestore_update_catalogd_tcp_timeout_seconds));
}
} else {
update_skipped = (response.__isset.skipped && response.skipped);
}
}
}
if (status.ok()) {
if (update_skipped) {
// The subscriber skipped processing this update. It's not considered as a failure
// since subscribers can decide what they do with any update. The subscriber is
// left in the receiver list so that RPC will be resent to it in next round.
++it;
} else {
UpdateSubscriberCatalogInfo(it->get()->id());
successful_update_catalogd_rpc_metric_->Increment(1);
// Remove the subscriber from the receiver list so that Statestore will not resend
// RPC to it in next round.
it = rpc_receivers.erase(it);
}
} else {
LOG(ERROR) << "Couldn't send UpdateCatalogd RPC, " << status.GetDetail();
failed_update_catalogd_rpc_metric_->Increment(1);
// Leave the subscriber in the receiver list. Statestore will resend RPC to it in
// next round.
++it;
}
}
if (rpc_receivers.empty()) {
LOG(INFO) << "Successfully sent UpdateCatalogd RPCs to all subscribers";
}
}
[[noreturn]] void Statestore::MonitorUpdateStatestoredRole() {
// rpc_receivers is used to track subscribers to which statestore need to send RPCs
// when there is a change in the elected active statestored. It is updated from
// subscribers_, and the subscribers will be removed from this list if the RPCs are
// successfully sent to them.
vector<std::shared_ptr<Subscriber>> rpc_receivers;
// Wait for notification. If new statestored leader is elected, send notification
// to all subscribers.
int64_t timeout_us = FLAGS_update_statestore_rpc_resend_interval_ms * MICROS_PER_MILLI;
int64_t last_active_statestored_version = 0;
while (1) {
{
unique_lock<mutex> l(ha_lock_);
update_statestored_cv_.WaitFor(l, timeout_us);
}
SendUpdateStatestoredRoleNotification(
&last_active_statestored_version, rpc_receivers);
}
}
void Statestore::SendUpdateStatestoredRoleNotification(
int64_t* last_active_statestored_version,
vector<std::shared_ptr<Subscriber>>& rpc_receivers) {
bool is_active_statestored = false;
int64_t active_statestored_version = GetActiveVersion(&is_active_statestored);
if (!is_active_statestored ||
(active_statestored_version == *last_active_statestored_version
&& rpc_receivers.empty())) {
// Don't resend RPCs if there is no change in active statestored and no RPC failure
// in last round.
return;
}
bool has_active_catalogd;
int64_t active_catalogd_version = 0;
TCatalogRegistration catalogd_registration =
catalog_manager_.GetActiveCatalogRegistration(
&has_active_catalogd, &active_catalogd_version);
if (has_active_catalogd) {
active_catalogd_address_metric_->SetValue(
TNetworkAddressToString(catalogd_registration.address));
}
bool resend_rpc = false;
if (active_statestored_version > *last_active_statestored_version) {
// Send notification for the latest elected active statestored.
LOG(INFO) << "Send notification for active statestored version: "
<< active_statestored_version;
// statestored_active_status_metric_->SetValue(true);
rpc_receivers.clear();
{
lock_guard<mutex> l(subscribers_lock_);
for (const auto& subscriber : subscribers_) {
rpc_receivers.push_back(subscriber.second);
}
}
*last_active_statestored_version = active_statestored_version;
} else {
DCHECK(!rpc_receivers.empty());
lock_guard<mutex> l(subscribers_lock_);
for (std::vector<std::shared_ptr<Subscriber>>::iterator it = rpc_receivers.begin();
it != rpc_receivers.end();) {
// Don't resend RPC to subscribers which have been removed from subscriber list.
std::shared_ptr<Subscriber> subscriber = *it;
if (subscribers_.find(subscriber->id()) == subscribers_.end()) {
it = rpc_receivers.erase(it);
} else {
++it;
}
}
if (rpc_receivers.empty()) return;
resend_rpc = true;
}
for (std::vector<std::shared_ptr<Subscriber>>::iterator it = rpc_receivers.begin();
it != rpc_receivers.end();) {
std::shared_ptr<Subscriber> subscriber = *it;
Status status;
if (!resend_rpc) {
status = DebugAction(
FLAGS_debug_actions, "SEND_UPDATE_STATESTORED_RPC_FIRST_ATTEMPT");
}
if (status.ok()) {
StatestoreSubscriberConn client(update_statestored_client_cache_.get(),
subscriber->network_address(), &status);
if (status.ok()) {
TUpdateStatestoredRoleRequest request;
TUpdateStatestoredRoleResponse response;
request.__set_registration_id(subscriber->registration_id());
request.__set_statestore_id(statestore_id_);
request.__set_active_statestored_version(active_statestored_version);
request.__set_is_active(true);
if (has_active_catalogd && subscriber->IsSubscribedCatalogdChange()) {
request.__set_catalogd_version(active_catalogd_version);
request.__set_catalogd_registration(catalogd_registration);
}
status = client.DoRpc(&StatestoreSubscriberClientWrapper::UpdateStatestoredRole,
request, &response);
if (!status.ok() && status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
// Add details to status to make it more useful, while preserving the stack
status.AddDetail(Substitute(
"Subscriber $0 timed-out during update catalogd RPC. Timeout is $1s.",
subscriber->id(), FLAGS_statestore_update_catalogd_tcp_timeout_seconds));
}
}
}
if (status.ok()) {
// Remove the subscriber from the receiver list so that Statestore will not resend
// RPC to it in next round.
successful_update_statestored_role_rpc_metric_->Increment(1);
it = rpc_receivers.erase(it);
} else {
LOG(ERROR) << "Couldn't send UpdateStatestoredRole RPC, " << status.GetDetail();
failed_update_statestored_role_rpc_metric_->Increment(1);
// Leave the subscriber in the receiver list. Statestore will resend RPC to it in
// next round.
++it;
}
}
if (rpc_receivers.empty()) {
LOG(INFO) << "Successfully sent UpdateStatestoredRole RPCs to all subscribers";
}
}
void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
SubscriberMap::const_iterator it = subscribers_.find(subscriber->id());
if (it == subscribers_.end() ||
it->second->registration_id() != subscriber->registration_id()) {
// Already failed and / or replaced with a new registration
return;
}
// Close all active clients so that the next attempt to use them causes a Reopen()
update_state_client_cache_->CloseConnections(subscriber->network_address());
heartbeat_client_cache_->CloseConnections(subscriber->network_address());
update_catalogd_client_cache_->CloseConnections(subscriber->network_address());
if (FLAGS_enable_statestored_ha) {
update_statestored_client_cache_->CloseConnections(subscriber->network_address());
}
// Prevent the failure detector from growing without bound
failure_detector_->EvictPeer(subscriber->id());
// Delete all transient entries
{
shared_lock<shared_mutex> topic_lock(topics_map_lock_);
subscriber->DeleteAllTransientEntries(&topics_);
}
num_subscribers_metric_->Increment(-1L);
subscriber_set_metric_->Remove(subscriber->id());
subscribers_.erase(subscriber->id());
if (FLAGS_enable_statestored_ha) {
ActiveConnStateMap::iterator conn_states_it =
active_conn_states_.find(subscriber->id());
if (conn_states_it != active_conn_states_.end()) {
if (conn_states_it->second == TStatestoreConnState::FAILED) {
--failed_conn_state_count_;
}
active_conn_states_.erase(conn_states_it);
}
}
}
void Statestore::MainLoop() {
subscriber_topic_update_threadpool_.Join();
subscriber_priority_topic_update_threadpool_.Join();
subscriber_heartbeat_threadpool_.Join();
}
void Statestore::ShutdownForTesting() {
CHECK(TestInfo::is_be_test()) << "Only valid to call in backend tests.";
subscriber_topic_update_threadpool_.Shutdown();
subscriber_priority_topic_update_threadpool_.Shutdown();
subscriber_heartbeat_threadpool_.Shutdown();
subscriber_topic_update_threadpool_.Join();
subscriber_priority_topic_update_threadpool_.Join();
subscriber_heartbeat_threadpool_.Join();
}
int64_t Statestore::FailedExecutorDetectionTimeMs() {
return FLAGS_statestore_max_missed_heartbeats * FLAGS_statestore_heartbeat_frequency_ms;
}
void Statestore::HealthzHandler(
const Webserver::WebRequest& req, std::stringstream* data, HttpStatusCode* response) {
if (service_started_) {
(*data) << "OK";
*response = HttpStatusCode::Ok;
return;
}
*(data) << "Not Available";
*response = HttpStatusCode::ServiceUnavailable;
}
Status Statestore::InitStatestoreHa(
int32_t statestore_ha_port, const TNetworkAddress& peer_statestore_ha_addr) {
local_statestore_ha_addr_ = MakeNetworkAddress(FLAGS_hostname, statestore_ha_port);
peer_statestore_ha_addr_ = peer_statestore_ha_addr;
std::shared_ptr<TProcessor> processor(
new StatestoreHaServiceProcessor(ha_thrift_iface()));
std::shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("StatestoreHa", metrics_));
processor->setEventHandler(event_handler);
ThriftServerBuilder builder("StatestoreHaService", processor, statestore_ha_port);
// Mark this as an internal service to use a more permissive Thrift max message size
builder.is_external_facing(false);
if (IsInternalTlsConfigured()) {
SSLProtocol ssl_version;
RETURN_IF_ERROR(
SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
LOG(INFO) << "Enabling SSL for Statestore";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
ThriftServer* ha_server;
RETURN_IF_ERROR(builder.metrics(metrics_).Build(&ha_server));
ha_thrift_server_.reset(ha_server);
RETURN_IF_ERROR(ha_thrift_server_->Start());
// Wait till Thrift server is ready.
RETURN_IF_ERROR(WaitForLocalServer(
*ha_thrift_server_, /* num_retries */ 10, /* retry_interval_ms */ 1000));
RETURN_IF_ERROR(Thread::Create("statestore-ha-heartbeat",
"ha-heartbeat-monitoring-thread", &Statestore::MonitorStatestoredHaHeartbeat,
this, &ha_monitoring_thread_));
RETURN_IF_ERROR(Thread::Create("statestore-update-statestored",
"update-statestored-thread",&Statestore::MonitorUpdateStatestoredRole, this,
&update_statestored_thread_));
// Negotiate role for HA with peer statestore instance on startup.
TStatestoreHaHandshakeResponse response;
Status status = SendHaHandshake(&response);
if (!status.ok()) {
// statestored designates itself as active if the statestore can not connect to the
// peer statestored.
lock_guard<mutex> l(ha_lock_);
is_active_ = true;
active_status_metric_->SetValue(is_active_);
active_version_ = UnixMicros();
num_received_heartbeat_in_active_ = 0;
LOG(INFO) << "Set Statestore as active since it does not receive handshake "
<< "response in HA preemption waiting period";
found_peer_ = false;
connected_peer_metric_->SetValue(found_peer_);
} else {
status = Status(response.status);
DCHECK(status.ok());
lock_guard<mutex> l(ha_lock_);
peer_statestore_id_ = response.dst_statestore_id;
is_active_ = !response.dst_statestore_active;
active_status_metric_->SetValue(is_active_);
if (is_active_) active_version_ = UnixMicros();
found_peer_ = true;
connected_peer_metric_->SetValue(found_peer_);
// connected_to_peer_statestore_metric_->SetValue(true);
LOG(INFO) << "Receive Statestore HA handshake response, set the statestore as "
<< (is_active_ ? "active" : "standby");
}
return Status::OK();
}
bool Statestore::IsActive() {
lock_guard<mutex> l(ha_lock_);
return is_active_;
}
int64_t Statestore::GetActiveVersion(bool* is_active) {
lock_guard<mutex> l(ha_lock_);
*is_active = is_active_;
return active_version_;
}
bool Statestore::IsInRecoveryMode() {
lock_guard<mutex> l(ha_lock_);
return in_recovery_mode_;
}
Status Statestore::SendHaHandshake(TStatestoreHaHandshakeResponse* response) {
if (disable_network_.Load()) {
return Status("Don't send HA handshake since network is disabled.");
}
// Negotiate the role for HA with peer statestore instance in client mode.
LOG(INFO) << "Send Statestore HA handshake request";
TStatestoreHaHandshakeRequest request;
request.__set_src_statestore_id(statestore_id_);
request.__set_src_statestore_address(
TNetworkAddressToString(local_statestore_ha_addr_));
request.__set_src_force_active(FLAGS_statestore_force_active);
int attempt = 0; // Used for debug action only.
if (FLAGS_statestore_peer_cnxn_attempts > 0) {
FLAGS_statestore_peer_cnxn_retry_interval_ms =
FLAGS_statestore_ha_preemption_wait_period_ms /
FLAGS_statestore_peer_cnxn_attempts;
}
StatestoreHaServiceConn::RpcStatus rpc_status =
StatestoreHaServiceConn::DoRpcWithRetry(ha_client_cache_.get(),
peer_statestore_ha_addr_,
&StatestoreHaServiceClientWrapper::StatestoreHaHandshake,
request,
FLAGS_statestore_peer_cnxn_attempts,
FLAGS_statestore_peer_cnxn_retry_interval_ms,
[&attempt]() {
return attempt++ == 0 ?
DebugAction(FLAGS_debug_actions, "STATESTORE_HA_HANDSHAKE_FIRST_ATTEMPT")
: Status::OK();
},
response);
return rpc_status.status;
}
Status Statestore::ReceiveHaHandshakeRequest(const TUniqueId& peer_statestore_id,
const string& peer_statestore_address, bool peer_force_active,
bool* statestore_active) {
// Process HA handshake request from peer statstore
LOG(INFO) << "Receive Statestore HA handshake request";
lock_guard<mutex> l(ha_lock_);
peer_statestore_id_ = peer_statestore_id;
if (peer_force_active && !FLAGS_statestore_force_active) {
is_active_ = false;
active_status_metric_->SetValue(is_active_);
ha_active_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
LOG(INFO) << "Set the statestored as standby since the peer is started with force "
<< "active flag";
} else if (!is_active_) {
if (FLAGS_statestore_force_active && !peer_force_active) {
is_active_ = true;
active_status_metric_->SetValue(is_active_);
active_version_ = UnixMicros();
num_received_heartbeat_in_active_ = 0;
ha_standby_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
LOG(INFO) << "Set the statestored as active since it's started with force active "
<< "flag";
} else {
// Compare priority and assign the statestored with high priority as active
// statestored.
is_active_ = FLAGS_use_network_address_as_statestore_priority ?
TNetworkAddressToString(local_statestore_ha_addr_) < peer_statestore_address :
statestore_id_ < peer_statestore_id;
active_status_metric_->SetValue(is_active_);
if (is_active_) {
active_version_ = UnixMicros();
ha_standby_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
} else {
ha_active_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
}
LOG(INFO) << "Set the statestored as " << (is_active_ ? "active" : "standby");
}
} else {
LOG(INFO) << "Active state of statestored is not changed";
}
*statestore_active = is_active_;
if (!found_peer_) {
found_peer_ = true;
connected_peer_metric_->SetValue(found_peer_);
}
return Status::OK();
}
void Statestore::HaHeartbeatRequest(const TUniqueId& dst_statestore_id,
const TUniqueId& src_statestore_id) {
// Don't process HA heartbeat if network is disabled.
if (disable_network_.Load()) return;
lock_guard<mutex> l(ha_lock_);
if (!is_active_) {
// process HA heartbeat from active statestore
ha_active_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
} else {
num_received_heartbeat_in_active_++;
if (num_received_heartbeat_in_active_ <= MAX_NUM_RECEIVED_HEARTBEAT_IN_ACTIVE) {
return;
}
// Repeatedly receive heartbeat from its peer statestored. That means both statestored
// designate themselves as active. Enter recovery mode to restart negotiation.
LOG(WARNING)
<< "Both statestoreds designate themselves as active, restart negotiation.";
in_recovery_mode_ = true;
recovery_start_time_ = MonotonicMillis();
in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
is_active_ = false;
active_status_metric_->SetValue(is_active_);
LOG(WARNING) << "Enter HA recovery mode.";
}
}
// TODO: break this function to 3 functions for each branch: recovery-mode, active state,
// and standby state.
[[noreturn]] void Statestore::MonitorStatestoredHaHeartbeat() {
bool sleep_between_processing = true;
while (1) {
if (sleep_between_processing) {
SleepForMs(FLAGS_statestore_ha_heartbeat_monitoring_frequency_ms);
} else {
sleep_between_processing = true;
}
if (IsInRecoveryMode()) {
// Keep sending HA handshake request to its peer periodically until receiving
// response. Don't hold the ha_lock_ when sending HA handshake.
TStatestoreHaHandshakeResponse response;
Status status = SendHaHandshake(&response);
if (!status.ok()) continue;
status = Status(response.status);
DCHECK(status.ok());
lock_guard<mutex> l(ha_lock_);
if (!in_recovery_mode_) {
sleep_between_processing = false;
continue;
}
// Exit "recovery" mode.
in_recovery_mode_ = false;
in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
peer_statestore_id_ = response.dst_statestore_id;
is_active_ = !response.dst_statestore_active;
active_status_metric_->SetValue(is_active_);
found_peer_ = true;
connected_peer_metric_->SetValue(found_peer_);
int64_t elapsed_ms = MonotonicMillis() - recovery_start_time_;
LOG(INFO) << "Receive Statestore HA handshake response, exit HA recovery mode in "
<< PrettyPrinter::Print(elapsed_ms, TUnit::TIME_MS)
<< ". Set the statestored as " << (is_active_ ? "active" : "standby");
if (is_active_) {
active_version_ = UnixMicros();
// Send notification to all subscribers.
update_statestored_cv_.NotifyAll();
}
} else if (IsActive()) {
// Statestored in active state
// Send HA heartbeat to standby statestored.
bool send_heartbeat = false;
{
lock_guard<mutex> l(ha_lock_);
if (is_active_ && found_peer_) send_heartbeat = true;
}
if (send_heartbeat) {
Status status = SendHaHeartbeat();
if (status.ok()) continue;
}
lock_guard<mutex> l(ha_lock_);
if (!is_active_) {
sleep_between_processing = false;
continue;
}
// Check if standby statestored is reachable.
FailureDetector::PeerState state =
ha_standby_ss_failure_detector_->GetPeerState(STATESTORE_ID);
if (state != FailureDetector::FAILED) {
continue;
} else if (found_peer_) {
// Stop sending HA heartbeat.
found_peer_ = false;
connected_peer_metric_->SetValue(found_peer_);
LOG(INFO) << "Statestored lost connection with peer statestored";
}
lock_guard<mutex> l2(subscribers_lock_);
if (subscribers_.size() == 0) {
// To avoid race with new active statestored, original active statestored enter
// "recovery" mode if it does not receive heartbeat responses from standby
// statestored and all subscribers.
in_recovery_mode_ = true;
recovery_start_time_ = MonotonicMillis();
in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
is_active_ = false;
active_status_metric_->SetValue(is_active_);
LOG(WARNING) << "Enter HA recovery mode.";
}
// TODO: IMPALA-12507 Need better approach to handle split-brain in network.
// In the scenario, active statestored still can reach some subscribers.
} else {
// Statestored in standby state
// Monitor connection state with its peer statestored.
lock_guard<mutex> l(ha_lock_);
if (is_active_) {
sleep_between_processing = false;
continue;
}
FailureDetector::PeerState state =
ha_active_ss_failure_detector_->GetPeerState(STATESTORE_ID);
// Check if the majority of subscribers lost connection with active statestored.
int failed_conn_state_count = 0;
int total_subscribers = 0;
{
lock_guard<mutex> l(subscribers_lock_);
failed_conn_state_count = failed_conn_state_count_;
total_subscribers = active_conn_states_.size();
}
bool majority_failed = failed_conn_state_count > 0 &&
failed_conn_state_count > total_subscribers / 2;
if (state != FailureDetector::FAILED) {
if (majority_failed) {
LOG(WARNING) << "Active statestored may have network issue. "
<< failed_conn_state_count << " out of " << total_subscribers
<< " subscribers lost connections with active statestored.";
}
continue;
}
found_peer_ = false;
connected_peer_metric_->SetValue(found_peer_);
if (majority_failed) {
// When standby statestored lost connection with active statestored, take over
// active role if the majority of subscribers lost connections with active
// statestored.
LOG(INFO) << "Statestore change to active state, " << failed_conn_state_count
<< " out of " << total_subscribers
<< " subscribers lost connections with active statestored.";
is_active_ = true;
active_status_metric_->SetValue(is_active_);
active_version_ = UnixMicros();
num_received_heartbeat_in_active_ = 0;
// Send notification to all subscribers.
update_statestored_cv_.NotifyAll();
} else if (total_subscribers == 0) {
// If there is no subscriber, it means this statestored lost connection with
// other nodes in the cluster, enter "recovery" mode.
in_recovery_mode_ = true;
recovery_start_time_ = MonotonicMillis();
in_ha_recovery_mode_metric_->SetValue(in_recovery_mode_);
LOG(WARNING) << "Enter HA recovery mode.";
} else {
VLOG(3) << "Standby statestored missed HA heartbeat from active statestored, "
<< failed_conn_state_count << " out of " << total_subscribers
<< " subscribers lost connections with active statestored.";
}
}
}
}
Status Statestore::SendHaHeartbeat() {
if (disable_network_.Load()) {
ha_standby_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, false);
return Status("Don't send HA heartbeat since network is disabled.");
}
Status status;
StatestoreHaServiceConn client(ha_client_cache_.get(),
peer_statestore_ha_addr_, &status);
RETURN_IF_ERROR(status);
TStatestoreHaHeartbeatRequest request;
TStatestoreHaHeartbeatResponse response;
{
lock_guard<mutex> l(ha_lock_);
request.__set_dst_statestore_id(peer_statestore_id_);
}
request.__set_src_statestore_id(statestore_id_);
status = client.DoRpc(&StatestoreHaServiceClientWrapper::StatestoreHaHeartbeat,
request, &response);
ha_standby_ss_failure_detector_->UpdateHeartbeat(STATESTORE_ID, status.ok());
if (status.ok()) {
status = Status(response.status);
}
return status;
}
void Statestore::UpdateSubscriberCatalogInfo(const SubscriberId& subscriber_id) {
lock_guard<mutex> l(subscribers_lock_);
SubscriberMap::iterator it = subscribers_.find(subscriber_id);
if (it == subscribers_.end()) return;
std::shared_ptr<Subscriber> subscriber = it->second;
bool has_active_catalogd;
int64_t active_catalogd_version = 0;
TCatalogRegistration catalogd_registration =
catalog_manager_.GetActiveCatalogRegistration(
&has_active_catalogd, &active_catalogd_version);
if (has_active_catalogd) {
subscriber->UpdateCatalogInfo(active_catalogd_version, catalogd_registration.address);
}
}
void Statestore::CatalogHAInfoHandler(
const Webserver::WebRequest& req, Document* document) {
if (FLAGS_enable_statestored_ha && !is_active_) {
document->AddMember("is_active_statestored", false, document->GetAllocator());
return;
}
document->AddMember("is_active_statestored", true, document->GetAllocator());
// HA INFO
bool has_active_catalogd;
int64_t active_catalogd_version = 0;
TCatalogRegistration active_catalog_registration =
catalog_manager_.GetActiveCatalogRegistration(&has_active_catalogd,
&active_catalogd_version);
document->AddMember("has_active_catalogd", has_active_catalogd,
document->GetAllocator());
document->AddMember("active_catalogd_version", active_catalogd_version,
document->GetAllocator());
if (active_catalogd_version > 0) {
Value last_update_catalogd_time_(ToStringFromUnixMillis(
catalog_manager_.GetLastUpdateCatalogTime(),
TimePrecision::Millisecond).c_str(), document->GetAllocator());
document->AddMember("last_update_catalogd_time", last_update_catalogd_time_,
document->GetAllocator());
}
if (has_active_catalogd) {
// Active catalogd information.
document->AddMember("active_catalogd_enable_catalogd_ha",
active_catalog_registration.enable_catalogd_ha, document->GetAllocator());
Value active_catalogd_address(
TNetworkAddressToString(active_catalog_registration.address).c_str(),
document->GetAllocator());
document->AddMember("active_catalogd_address", active_catalogd_address,
document->GetAllocator());
document->AddMember("active_catalogd_force_catalogd_active",
active_catalog_registration.force_catalogd_active, document->GetAllocator());
Value active_catalogd_registration_time(ToStringFromUnixMillis(
active_catalog_registration.registration_time,
TimePrecision::Millisecond).c_str(), document->GetAllocator());
document->AddMember("active_catalogd_registration_time",
active_catalogd_registration_time, document->GetAllocator());
}
// Standby catalogd information.
TCatalogRegistration standby_catalog_registration =
catalog_manager_.GetStandbyCatalogRegistration();
if (standby_catalog_registration.__isset.registration_time) {
document->AddMember("standby_catalogd_enable_catalogd_ha",
standby_catalog_registration.enable_catalogd_ha, document->GetAllocator());
Value standby_catalogd_address(
TNetworkAddressToString(standby_catalog_registration.address).c_str(),
document->GetAllocator());
document->AddMember(
"standby_catalogd_address", standby_catalogd_address, document->GetAllocator());
document->AddMember("standby_catalogd_force_catalogd_active",
standby_catalog_registration.force_catalogd_active, document->GetAllocator());
Value standby_catalogd_registration_time(ToStringFromUnixMillis(
standby_catalog_registration.registration_time,
TimePrecision::Millisecond).c_str(), document->GetAllocator());
document->AddMember("standby_catalogd_registration_time",
standby_catalogd_registration_time, document->GetAllocator());
}
lock_guard<mutex> l(subscribers_lock_);
Value notified_subscribers(kArrayType);
for (const SubscriberMap::value_type& subscriber : subscribers_) {
// Only subscribers of type COORDINATOR, COORDINATOR_EXECUTOR, or CATALOGD
// need to be returned.
if (subscriber.second->IsSubscribedCatalogdChange()) {
Value sub_json(kObjectType);
Value subscriber_id(subscriber.second->id().c_str(), document->GetAllocator());
sub_json.AddMember("id", subscriber_id, document->GetAllocator());
Value address(TNetworkAddressToString(
subscriber.second->network_address()).c_str(), document->GetAllocator());
sub_json.AddMember("address", address, document->GetAllocator());
Value registration_id(PrintId(subscriber.second->registration_id()).c_str(),
document->GetAllocator());
sub_json.AddMember("registration_id", registration_id, document->GetAllocator());
Value subscriber_type(SubscriberTypeToString(
subscriber.second->subscriber_type()).c_str(), document->GetAllocator());
sub_json.AddMember("subscriber_type", subscriber_type, document->GetAllocator());
if (subscriber.second->catalogd_version() > 0) {
sub_json.AddMember("catalogd_version", subscriber.second->catalogd_version(),
document->GetAllocator());
Value catalogd_address(TNetworkAddressToString(
subscriber.second->catalogd_address()).c_str(), document->GetAllocator());
sub_json.AddMember("catalogd_address", catalogd_address,
document->GetAllocator());
Value last_subscriber_update_catalogd_time(ToStringFromUnixMillis(
subscriber.second->last_update_catalogd_time(),
TimePrecision::Millisecond).c_str(), document->GetAllocator());
sub_json.AddMember("last_subscriber_update_catalogd_time",
last_subscriber_update_catalogd_time, document->GetAllocator());
}
notified_subscribers.PushBack(sub_json, document->GetAllocator());
}
}
document->AddMember(
"notified_subscribers", notified_subscribers, document->GetAllocator());
return;
}