driver/cluster_topology_info.cc (114 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License, version 2.0
// (GPLv2), as published by the Free Software Foundation, with the
// following additional permissions:
//
// This program is distributed with certain software that is licensed
// under separate terms, as designated in a particular file or component
// or in the license documentation. Without limiting your rights under
// the GPLv2, the authors of this program hereby grant you an additional
// permission to link the program and your derivative works with the
// separately licensed software that they have included with the program.
//
// Without limiting the foregoing grant of rights under the GPLv2 and
// additional permission as to separately licensed software, this
// program is also subject to the Universal FOSS Exception, version 1.0,
// a copy of which can be found along with its FAQ at
// http://oss.oracle.com/licenses/universal-foss-exception.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License, version 2.0, for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see
// http://www.gnu.org/licenses/gpl-2.0.html.
#include "cluster_topology_info.h"
#include <stdexcept>
#include <algorithm>
/**
Initialize and return random number.
Returns random number.
*/
int get_random_number() {
std::srand((unsigned int)time(nullptr));
return rand();
}
CLUSTER_TOPOLOGY_INFO::CLUSTER_TOPOLOGY_INFO() {
update_time();
}
// copy constructor
CLUSTER_TOPOLOGY_INFO::CLUSTER_TOPOLOGY_INFO(const CLUSTER_TOPOLOGY_INFO& src_info)
: current_reader{src_info.current_reader},
last_updated{src_info.last_updated},
last_used_reader{src_info.last_used_reader} {
for (auto host_info_source : src_info.writers) {
writers.push_back(std::make_shared<HOST_INFO>(*host_info_source)); //default copy
}
for (auto host_info_source : src_info.readers) {
readers.push_back(std::make_shared<HOST_INFO>(*host_info_source)); //default copy
}
}
CLUSTER_TOPOLOGY_INFO::~CLUSTER_TOPOLOGY_INFO() {
for (auto p : writers) {
p.reset();
}
writers.clear();
for (auto p : readers) {
p.reset();
}
readers.clear();
}
void CLUSTER_TOPOLOGY_INFO::add_host(std::shared_ptr<HOST_INFO> host_info) {
host_info->is_host_writer() ? writers.push_back(host_info) : readers.push_back(host_info);
update_time();
}
void CLUSTER_TOPOLOGY_INFO::remove_host(std::shared_ptr<HOST_INFO> host_info) {
auto position = std::find(writers.begin(), writers.end(), host_info);
if (position != writers.end()) {
writers.erase(position);
}
position = std::find(readers.begin(), readers.end(), host_info);
if (position != readers.end()) {
readers.erase(position);
}
update_time();
}
size_t CLUSTER_TOPOLOGY_INFO::total_hosts() {
return writers.size() + readers.size();
}
size_t CLUSTER_TOPOLOGY_INFO::num_readers() {
return readers.size();
}
std::time_t CLUSTER_TOPOLOGY_INFO::time_last_updated() {
return last_updated;
}
// TODO harmonize time function across objects so the times are comparable
void CLUSTER_TOPOLOGY_INFO::update_time() {
last_updated = time(nullptr);
}
std::shared_ptr<HOST_INFO> CLUSTER_TOPOLOGY_INFO::get_writer() {
if (writers.empty()) {
throw std::runtime_error("No writer available");
}
return writers[0];
}
std::shared_ptr<HOST_INFO> CLUSTER_TOPOLOGY_INFO::get_next_reader() {
size_t num_readers = readers.size();
if (readers.empty()) {
throw std::runtime_error("No reader available");
}
if (current_reader == -1) { // initialize for the first time
current_reader = get_random_number() % num_readers;
}
else if (current_reader >= num_readers) {
// adjust current reader in case topology was refreshed.
current_reader = (current_reader) % num_readers;
}
else {
current_reader = (current_reader + 1) % num_readers;
}
return readers[current_reader];
}
std::shared_ptr<HOST_INFO> CLUSTER_TOPOLOGY_INFO::get_reader(int i) {
if (i < 0 || i >= readers.size()) {
throw std::runtime_error("No reader available at index " + i);
}
return readers[i];
}
std::vector<std::shared_ptr<HOST_INFO>> CLUSTER_TOPOLOGY_INFO::get_readers() {
return readers;
}
std::vector<std::shared_ptr<HOST_INFO>> CLUSTER_TOPOLOGY_INFO::get_writers() {
return writers;
}
std::vector<std::shared_ptr<HOST_INFO>> CLUSTER_TOPOLOGY_INFO::get_instances() {
std::vector instances(writers);
instances.insert(instances.end(), readers.begin(), readers.end());
return instances;
}
std::shared_ptr<HOST_INFO> CLUSTER_TOPOLOGY_INFO::get_last_used_reader() {
return last_used_reader;
}
void CLUSTER_TOPOLOGY_INFO::set_last_used_reader(std::shared_ptr<HOST_INFO> reader) {
last_used_reader = reader;
}
void CLUSTER_TOPOLOGY_INFO::mark_host_down(std::shared_ptr<HOST_INFO> host) {
host->set_host_state(DOWN);
down_hosts.insert(host->get_host_port_pair());
}
void CLUSTER_TOPOLOGY_INFO::mark_host_up(std::shared_ptr<HOST_INFO> host) {
host->set_host_state(UP);
down_hosts.erase(host->get_host_port_pair());
}
std::set<std::string> CLUSTER_TOPOLOGY_INFO::get_down_hosts() {
return down_hosts;
}