driver/topology_service.cc (262 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License, version 2.0
// (GPLv2), as published by the Free Software Foundation, with the
// following additional permissions:
//
// This program is distributed with certain software that is licensed
// under separate terms, as designated in a particular file or component
// or in the license documentation. Without limiting your rights under
// the GPLv2, the authors of this program hereby grant you an additional
// permission to link the program and your derivative works with the
// separately licensed software that they have included with the program.
//
// Without limiting the foregoing grant of rights under the GPLv2 and
// additional permission as to separately licensed software, this
// program is also subject to the Universal FOSS Exception, version 1.0,
// a copy of which can be found along with its FAQ at
// http://oss.oracle.com/licenses/universal-foss-exception.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License, version 2.0, for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see
// http://www.gnu.org/licenses/gpl-2.0.html.
#include "cluster_aware_metrics_container.h"
#include "topology_service.h"
#include <sstream>
TOPOLOGY_SERVICE::TOPOLOGY_SERVICE(unsigned long dbc_id, bool enable_logging)
: dbc_id{dbc_id},
cluster_instance_host{nullptr},
refresh_rate_in_ms{DEFAULT_REFRESH_RATE_IN_MILLISECONDS},
metrics_container{std::make_shared<CLUSTER_AWARE_METRICS_CONTAINER>()}{
// TODO get better initial cluster id
time_t now = time(0);
cluster_id = std::to_string(now) + ctime(&now);
if (enable_logging)
logger = init_log_file();
}
TOPOLOGY_SERVICE::TOPOLOGY_SERVICE(const TOPOLOGY_SERVICE& ts) {
refresh_rate_in_ms = ts.refresh_rate_in_ms;
cluster_id = ts.cluster_id;
cluster_instance_host = ts.cluster_instance_host;
logger = ts.logger;
dbc_id = ts.dbc_id;
metrics_container = ts.metrics_container;
}
TOPOLOGY_SERVICE::~TOPOLOGY_SERVICE() {
if (cluster_instance_host)
cluster_instance_host.reset();
}
void TOPOLOGY_SERVICE::set_cluster_id(std::string cid) {
MYLOG_TRACE(logger, dbc_id, "[TOPOLOGY_SERVICE] cluster ID=%s", cid.c_str());
this->cluster_id = cid;
metrics_container->set_cluster_id(this->cluster_id);
}
void TOPOLOGY_SERVICE::set_cluster_instance_template(std::shared_ptr<HOST_INFO> host_template) {
// NOTE, this may not have to be a pointer. Copy the information passed to this function.
// Alernatively the create host function should be part of the Topologyservice, even protected or private one
// and host information passed as separate parameters.
if (cluster_instance_host)
cluster_instance_host.reset();
MYLOG_TRACE(logger, dbc_id,
"[TOPOLOGY_SERVICE] cluster instance host=%s, port=%d",
host_template->get_host().c_str(), host_template->get_port());
cluster_instance_host = host_template;
}
void TOPOLOGY_SERVICE::set_refresh_rate(int refresh_rate) {
refresh_rate_in_ms = refresh_rate;
}
std::shared_ptr<HOST_INFO> TOPOLOGY_SERVICE::get_last_used_reader() {
auto topology_info = get_from_cache();
if (!topology_info || refresh_needed(topology_info->time_last_updated())) {
return nullptr;
}
return topology_info->get_last_used_reader();
}
void TOPOLOGY_SERVICE::set_last_used_reader(std::shared_ptr<HOST_INFO> reader) {
if (reader) {
std::unique_lock<std::mutex> lock(topology_cache_mutex);
auto topology_info = get_from_cache();
if (topology_info) {
topology_info->set_last_used_reader(reader);
}
lock.unlock();
}
}
std::set<std::string> TOPOLOGY_SERVICE::get_down_hosts() {
std::set<std::string> down_hosts;
std::unique_lock<std::mutex> lock(topology_cache_mutex);
auto topology_info = get_from_cache();
if (topology_info) {
down_hosts = topology_info->get_down_hosts();
}
lock.unlock();
return down_hosts;
}
void TOPOLOGY_SERVICE::mark_host_down(std::shared_ptr<HOST_INFO> host) {
if (!host) {
return;
}
std::unique_lock<std::mutex> lock(topology_cache_mutex);
auto topology_info = get_from_cache();
if (topology_info) {
topology_info->mark_host_down(host);
}
lock.unlock();
}
void TOPOLOGY_SERVICE::mark_host_up(std::shared_ptr<HOST_INFO> host) {
if (!host) {
return;
}
std::unique_lock<std::mutex> lock(topology_cache_mutex);
auto topology_info = get_from_cache();
if (topology_info) {
topology_info->mark_host_up(host);
}
lock.unlock();
}
void TOPOLOGY_SERVICE::set_gather_metric(bool can_gather) {
this->metrics_container->set_gather_metric(can_gather);
}
void TOPOLOGY_SERVICE::clear_all() {
std::unique_lock<std::mutex> lock(topology_cache_mutex);
topology_cache.clear();
lock.unlock();
}
void TOPOLOGY_SERVICE::clear() {
std::unique_lock<std::mutex> lock(topology_cache_mutex);
topology_cache.erase(cluster_id);
lock.unlock();
}
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> TOPOLOGY_SERVICE::get_cached_topology() {
return get_from_cache();
}
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> TOPOLOGY_SERVICE::get_topology(CONNECTION_PROXY* connection, bool force_update) {
// TODO reconsider using this cache. It appears that we only store information for the current cluster Id.
// therefore instead of a map we can just keep CLUSTER_TOPOLOGY_INFO* topology_info member variable.
auto cached_topology = get_from_cache();
if (!cached_topology
|| force_update
|| refresh_needed(cached_topology->time_last_updated()))
{
if (auto latest_topology = query_for_topology(connection)) {
put_to_cache(latest_topology);
return latest_topology;
}
}
return cached_topology;
}
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> TOPOLOGY_SERVICE::get_filtered_topology(std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology) {
if (!this->allowed_and_blocked_hosts) {
return topology;
}
std::set<std::string> allowed_list = this->allowed_and_blocked_hosts->get_allowed_host_ids();
std::set<std::string> blocked_list = this->allowed_and_blocked_hosts->get_blocked_host_ids();
const std::shared_ptr<CLUSTER_TOPOLOGY_INFO> filtered_topology = std::make_shared<CLUSTER_TOPOLOGY_INFO>();
for (const auto& host : topology->get_instances()) {
const auto host_id = host->get_host_id();
if (allowed_list.find(host_id) != allowed_list.end() && blocked_list.find(host_id) == blocked_list.end()) {
filtered_topology->add_host(host);
}
}
return filtered_topology;
}
std::string TOPOLOGY_SERVICE::log_topology(const std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology) {
std::stringstream topology_str;
topology_str << "[TOPOLOGY_SERVICE] Topology: ";
if (topology->total_hosts() == 0) {
topology_str << "<empty topology>";
return topology_str.str();
}
for (const auto& host : topology->get_instances()) {
topology_str << "\n\t" << *host;
}
return topology_str.str();
}
// TODO consider thread safety and usage of pointers
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> TOPOLOGY_SERVICE::get_from_cache() {
if (topology_cache.empty()) {
metrics_container->register_use_cached_topology(false);
return nullptr;
}
auto result = topology_cache.find(cluster_id);
if (result == topology_cache.end()) {
metrics_container->register_use_cached_topology(false);
return nullptr;
}
metrics_container->register_use_cached_topology(true);
return result->second;
}
// TODO consider thread safety and usage of pointers
void TOPOLOGY_SERVICE::put_to_cache(std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info) {
if (!topology_cache.empty())
{
auto result = topology_cache.find(cluster_id);
if (result != topology_cache.end()) {
result->second.reset();
result->second = topology_info;
return;
}
}
std::unique_lock<std::mutex> lock(topology_cache_mutex);
topology_cache[cluster_id] = topology_info;
lock.unlock();
}
MYSQL_RES* TOPOLOGY_SERVICE::try_execute_query(CONNECTION_PROXY* connection_proxy, const char* query) {
if (connection_proxy != nullptr && connection_proxy->query(query) == 0) {
return connection_proxy->store_result();
}
return nullptr;
}
// TODO harmonize time function across objects so the times are comparable
bool TOPOLOGY_SERVICE::refresh_needed(std::time_t last_updated) {
return time(0) - last_updated > (refresh_rate_in_ms / 1000);
}
std::shared_ptr<HOST_INFO> TOPOLOGY_SERVICE::create_host(MYSQL_ROW& row) {
//TEMP and TODO figure out how to fetch values from row by name, not by ordinal for now this enum is matching
// order of columns in the query
enum COLUMNS {
SERVER_ID,
SESSION,
LAST_UPDATE_TIMESTAMP,
REPLICA_LAG_MILLISECONDS
};
if (row[SERVER_ID] == NULL) {
return nullptr; // will not be able to generate host endpoint so no point. TODO: log this condition?
}
std::string host_endpoint = get_host_endpoint(row[SERVER_ID]);
//TODO check cluster_instance_host for NULL, or decide what is needed out of it
std::shared_ptr<HOST_INFO> host_info = std::make_shared<HOST_INFO>(
host_endpoint, cluster_instance_host->get_port());
//TODO do case-insensitive comparison
// side note: how stable this is on the server side? If it changes there we will not detect a writer.
if (strcmp(row[SESSION], WRITER_SESSION_ID) == 0)
{
host_info->mark_as_writer(true);
}
host_info->instance_name = row[SERVER_ID] ? row[SERVER_ID] : "";
host_info->session_id = row[SESSION] ? row[SESSION] : "";
host_info->last_updated = row[LAST_UPDATE_TIMESTAMP] ? row[LAST_UPDATE_TIMESTAMP] : "";
host_info->replica_lag = row[REPLICA_LAG_MILLISECONDS] ? row[REPLICA_LAG_MILLISECONDS] : "";
return host_info;
}
// If no host information retrieved return NULL
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> TOPOLOGY_SERVICE::query_for_topology(CONNECTION_PROXY* connection_proxy) {
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info = nullptr;
std::chrono::steady_clock::time_point start_time_ms = std::chrono::steady_clock::now();
if (MYSQL_RES* result = try_execute_query(connection_proxy, RETRIEVE_TOPOLOGY_SQL)) {
topology_info = std::make_shared<CLUSTER_TOPOLOGY_INFO>();
std::map<std::string, std::shared_ptr<HOST_INFO>> instances;
MYSQL_ROW row;
std::shared_ptr<HOST_INFO> latest_writer = nullptr;
while ((row = connection_proxy->fetch_row(result))) {
std::shared_ptr<HOST_INFO> host_info = create_host(row);
if (host_info) {
if (host_info->is_host_writer()) {
// Only mark the latest writer as true writer. Ignore other writers (possible stale records, multi-writer not supported)
// Date in lexographic order, so str comparison works for most recent date
if (!latest_writer || host_info->last_updated > latest_writer->last_updated) {
latest_writer = host_info;
}
}
else if (!TOPOLOGY_SERVICE::does_instance_exist(instances, host_info)) {
// Add readers to topology if instance not seen before
topology_info->add_host(host_info);
}
}
}
if (latest_writer && !TOPOLOGY_SERVICE::does_instance_exist(instances, latest_writer))
{
topology_info->add_host(latest_writer);
}
connection_proxy->free_result(result);
if (!latest_writer) { // No writer found
MYLOG_TRACE(logger, dbc_id,
"[TOPOLOGY_SERVICE] The topology query returned an "
"invalid topology - no writer instance detected");
}
}
std::chrono::steady_clock::time_point end_time_ms = std::chrono::steady_clock::now();
long long elasped_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(end_time_ms - start_time_ms).count();
metrics_container->register_topology_query_execution_time(elasped_time_ms);
return topology_info;
}
bool TOPOLOGY_SERVICE::does_instance_exist(
std::map<std::string, std::shared_ptr<HOST_INFO>>& instances,
std::shared_ptr<HOST_INFO> host_info) {
auto duplicate = instances.find(host_info->instance_name);
if (duplicate != instances.end()) {
return true;
} else {
instances.insert(std::pair<std::string, std::shared_ptr<HOST_INFO>>(
host_info->instance_name, host_info));
return false;
}
}
std::string TOPOLOGY_SERVICE::get_host_endpoint(const char* node_name) {
std::string host = cluster_instance_host->get_host();
size_t position = host.find("?");
if (position != std::string::npos) {
host.replace(position, 1, node_name);
}
return host;
}