driver/failover_reader_handler.cc (207 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License, version 2.0
// (GPLv2), as published by the Free Software Foundation, with the
// following additional permissions:
//
// This program is distributed with certain software that is licensed
// under separate terms, as designated in a particular file or component
// or in the license documentation. Without limiting your rights under
// the GPLv2, the authors of this program hereby grant you an additional
// permission to link the program and your derivative works with the
// separately licensed software that they have included with the program.
//
// Without limiting the foregoing grant of rights under the GPLv2 and
// additional permission as to separately licensed software, this
// program is also subject to the Universal FOSS Exception, version 1.0,
// a copy of which can be found along with its FAQ at
// http://oss.oracle.com/licenses/universal-foss-exception.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License, version 2.0, for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see
// http://www.gnu.org/licenses/gpl-2.0.html.
#include "driver.h"
#include <algorithm>
#include <chrono>
#include <future>
#include <random>
#include <thread>
#include <vector>
FAILOVER_READER_HANDLER::FAILOVER_READER_HANDLER(
std::shared_ptr<TOPOLOGY_SERVICE> topology_service,
std::shared_ptr<CONNECTION_HANDLER> connection_handler,
ctpl::thread_pool& thread_pool,
int failover_timeout_ms, int failover_reader_connect_timeout,
bool enable_strict_reader_failover,
unsigned long dbc_id, bool enable_logging)
: topology_service{topology_service},
connection_handler{connection_handler},
thread_pool{thread_pool},
max_failover_timeout_ms{failover_timeout_ms},
reader_connect_timeout_ms{failover_reader_connect_timeout},
enable_strict_reader_failover{enable_strict_reader_failover},
dbc_id{dbc_id} {
if (enable_logging)
logger = init_log_file();
}
FAILOVER_READER_HANDLER::~FAILOVER_READER_HANDLER() {}
// Function called to start the Reader Failover process.
// This process will generate a list of available hosts: First readers that are up, then readers marked as down, then writers.
// If it goes through the list and does not succeed to connect, it tries again, endlessly.
std::shared_ptr<READER_FAILOVER_RESULT> FAILOVER_READER_HANDLER::failover(
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> current_topology) {
auto empty_result = std::make_shared<READER_FAILOVER_RESULT>(false, nullptr, nullptr);
if (!current_topology || current_topology->total_hosts() == 0) {
return empty_result;
}
const auto start = std::chrono::steady_clock::now();
auto global_sync = std::make_shared<FAILOVER_SYNC>(1);
if (thread_pool.n_idle() == 0) {
MYLOG_TRACE(logger, dbc_id, "[FAILOVER_READER_HANDLER] Resizing thread pool to %d", thread_pool.size() + 1);
thread_pool.resize(thread_pool.size() + 1);
}
auto reader_result_future = thread_pool.push([=](int id) {
while (!global_sync->is_completed()) {
auto hosts_list = build_hosts_list(current_topology, !enable_strict_reader_failover);
auto reader_result = get_connection_from_hosts(hosts_list, global_sync);
if (reader_result->connected) {
global_sync->mark_as_complete(true);
return reader_result;
}
// TODO Think of changes to the strategy if it went
// through all the hosts and did not connect.
std::this_thread::sleep_for(std::chrono::seconds(READER_CONNECT_INTERVAL_SEC));
}
return empty_result;
});
// Wait for task complete signal with specified timeout
global_sync->wait_and_complete(max_failover_timeout_ms);
// Constantly polling for results until timeout
while (true) {
if (reader_result_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
MYLOG_TRACE(logger, dbc_id, "[FAILOVER_READER_HANDLER] Reader failover finished.");
return reader_result_future.get();
}
// No result it ready, update remaining timeout
const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start);
const auto remaining_wait_ms = max_failover_timeout_ms - duration.count();
if (remaining_wait_ms <= 0) {
// Reader failover timed out
MYLOG_TRACE(logger, dbc_id, "[FAILOVER_READER_HANDLER] Reader failover timed out. Failed to connect to the reader instance.");
return empty_result;
}
}
}
// Function to connect to a reader host. Often used to query/update the topology.
// If it goes through the list of readers and fails to connect, it tries again, endlessly.
// This function only tries to connect to reader hosts.
std::shared_ptr<READER_FAILOVER_RESULT> FAILOVER_READER_HANDLER::get_reader_connection(
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info,
std::shared_ptr<FAILOVER_SYNC> f_sync) {
// We build a list of all readers, up then down, without writers.
auto hosts = build_hosts_list(topology_info, false);
while (!f_sync->is_completed()) {
auto reader_result = get_connection_from_hosts(hosts, f_sync);
// TODO Think of changes to the strategy if it went through all the readers and did not connect.
if (reader_result->connected) {
return reader_result;
}
}
// Return a false result if the connection request has been cancelled.
return std::make_shared<READER_FAILOVER_RESULT>(false, nullptr, nullptr);
}
// Function that reads the topology and builds a list of hosts to connect to, in order of priority.
// boolean include_writers indicate whether one wants to append the writers to the end of the list or not.
std::vector<std::shared_ptr<HOST_INFO>> FAILOVER_READER_HANDLER::build_hosts_list(
std::shared_ptr<CLUSTER_TOPOLOGY_INFO> topology_info,
bool include_writers) {
std::vector<std::shared_ptr<HOST_INFO>> hosts_list;
// We split reader hosts that are marked up from those marked down.
std::vector<std::shared_ptr<HOST_INFO>> readers_up;
std::vector<std::shared_ptr<HOST_INFO>> readers_down;
auto readers = topology_info->get_readers();
for (auto reader : readers) {
if (reader->is_host_down()) {
readers_down.push_back(reader);
} else {
readers_up.push_back(reader);
}
}
// Both lists of readers up and down are shuffled.
auto rng = std::default_random_engine{};
std::shuffle(std::begin(readers_up), std::end(readers_up), rng);
std::shuffle(std::begin(readers_down), std::end(readers_down), rng);
// Readers that are marked up go first, readers marked down go after.
hosts_list.insert(hosts_list.end(), readers_up.begin(), readers_up.end());
hosts_list.insert(hosts_list.end(), readers_down.begin(), readers_down.end());
if (include_writers) {
auto writers = topology_info->get_writers();
std::shuffle(std::begin(writers), std::end(writers), rng);
hosts_list.insert(hosts_list.end(), writers.begin(), writers.end());
}
return hosts_list;
}
std::shared_ptr<READER_FAILOVER_RESULT> FAILOVER_READER_HANDLER::get_connection_from_hosts(
std::vector<std::shared_ptr<HOST_INFO>> hosts_list, std::shared_ptr<FAILOVER_SYNC> global_sync) {
size_t total_hosts = hosts_list.size();
size_t i = 0;
// This loop should end once it reaches the end of the list without a successful connection.
// The function calling it already has a neverending loop looking for a connection.
// Ending this loop will allow the calling function to update the list or change strategy if this failed.
while (!global_sync->is_completed() && i < total_hosts) {
const auto start = std::chrono::steady_clock::now();
// This boolean verifies if the next host in the list is also the last, meaning there's no host for the second thread.
const bool odd_hosts_number = (i + 1 == total_hosts);
auto local_sync = std::make_shared<FAILOVER_SYNC>(1);
if (!odd_hosts_number) {
local_sync->increment_task();
}
CONNECT_TO_READER_HANDLER first_connection_handler(connection_handler, topology_service, dbc_id, logger != nullptr);
auto first_connection_result = std::make_shared<READER_FAILOVER_RESULT>(false, nullptr, nullptr);
CONNECT_TO_READER_HANDLER second_connection_handler(connection_handler, topology_service, dbc_id, logger != nullptr);
auto second_connection_result = std::make_shared<READER_FAILOVER_RESULT>(false, nullptr, nullptr);
std::shared_ptr<HOST_INFO> first_reader_host = hosts_list.at(i);
if (thread_pool.n_idle() <= 1) {
int size = thread_pool.size() + 2 - thread_pool.n_idle();
MYLOG_TRACE(logger, dbc_id,
"[FAILOVER_READER_HANDLER] Resizing thread pool to %d", size);
thread_pool.resize(size);
}
auto first_result = thread_pool.push(std::move(first_connection_handler), first_reader_host, local_sync, first_connection_result);
std::future<void> second_future;
if (!odd_hosts_number) {
auto second_reader_host = hosts_list.at(i + 1);
second_future = thread_pool.push(std::move(second_connection_handler), second_reader_host, local_sync, second_connection_result);
}
// Wait for task complete signal with specified timeout
local_sync->wait_and_complete(reader_connect_timeout_ms);
// Constantly polling for results until timeout
while (true) {
// Check if first reader task result is ready
if (first_result.valid() && first_result.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
first_result.get();
if (first_connection_result->connected) {
MYLOG_TRACE(logger, dbc_id,
"[FAILOVER_READER_HANDLER] Connected to reader: %s",
first_connection_result->new_host->get_host_port_pair().c_str());
return first_connection_result;
}
}
// Check if second reader task result is ready if there is one
if (!odd_hosts_number && second_future.valid() &&
second_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) {
second_future.get();
if (second_connection_result->connected) {
MYLOG_TRACE(logger, dbc_id,
"[FAILOVER_READER_HANDLER] Connected to reader: %s",
second_connection_result->new_host->get_host_port_pair().c_str());
return second_connection_result;
}
}
// Results are ready but non has valid connection
if (!first_result.valid() && (odd_hosts_number || !second_future.valid())) {
break;
}
// No result it ready, update remaining timeout
const auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start);
const auto remaining_wait_ms = reader_connect_timeout_ms - duration.count();
if (remaining_wait_ms <= 0) {
// None has connected. We move on and try new hosts.
std::this_thread::sleep_for(std::chrono::seconds(READER_CONNECT_INTERVAL_SEC));
break;
}
}
i += 2;
}
// The operation was either cancelled either reached the end of the list without connecting.
return std::make_shared<READER_FAILOVER_RESULT>(false, nullptr, nullptr);
}
// *** CONNECT_TO_READER_HANDLER
// Handler to connect to a reader host.
CONNECT_TO_READER_HANDLER::CONNECT_TO_READER_HANDLER(
std::shared_ptr<CONNECTION_HANDLER> connection_handler,
std::shared_ptr<TOPOLOGY_SERVICE> topology_service,
unsigned long dbc_id, bool enable_logging)
: FAILOVER{connection_handler, topology_service, dbc_id, enable_logging} {}
CONNECT_TO_READER_HANDLER::~CONNECT_TO_READER_HANDLER() {}
void CONNECT_TO_READER_HANDLER::operator()(
int id,
std::shared_ptr<HOST_INFO> reader,
std::shared_ptr<FAILOVER_SYNC> f_sync,
std::shared_ptr<READER_FAILOVER_RESULT> result) {
if (reader && !f_sync->is_completed()) {
MYLOG_TRACE(logger, dbc_id,
"Thread ID %d - [CONNECT_TO_READER_HANDLER] Trying to connect to reader: %s",
id, reader->get_host_port_pair().c_str());
if (connect(reader)) {
topology_service->mark_host_up(reader);
if (f_sync->is_completed()) {
// If another thread finishes first, or both timeout, this thread is canceled.
release_new_connection();
} else {
result->connected =true;
result->new_host = reader;
result->new_connection = std::move(this->new_connection);
f_sync->mark_as_complete(true);
MYLOG_TRACE(
logger, dbc_id,
"Thread ID %d - [CONNECT_TO_READER_HANDLER] Connected to reader: %s",
id, reader->get_host_port_pair().c_str());
return;
}
} else {
topology_service->mark_host_down(reader);
MYLOG_TRACE(
logger, dbc_id,
"Thread ID %d - [CONNECT_TO_READER_HANDLER] Failed to connect to reader: %s",
id, reader->get_host_port_pair().c_str());
if (!f_sync->is_completed()) {
f_sync->mark_as_complete(false);
}
}
}
release_new_connection();
}