driver/monitor.cc (192 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License, version 2.0
// (GPLv2), as published by the Free Software Foundation, with the
// following additional permissions:
//
// This program is distributed with certain software that is licensed
// under separate terms, as designated in a particular file or component
// or in the license documentation. Without limiting your rights under
// the GPLv2, the authors of this program hereby grant you an additional
// permission to link the program and your derivative works with the
// separately licensed software that they have included with the program.
//
// Without limiting the foregoing grant of rights under the GPLv2 and
// additional permission as to separately licensed software, this
// program is also subject to the Universal FOSS Exception, version 1.0,
// a copy of which can be found along with its FAQ at
// http://oss.oracle.com/licenses/universal-foss-exception.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License, version 2.0, for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see
// http://www.gnu.org/licenses/gpl-2.0.html.
#include "driver.h"
#include "monitor.h"
#include "connection_proxy.h"
#include "monitor_service.h"
#include "mylog.h"
MONITOR::MONITOR(
std::shared_ptr<HOST_INFO> host_info,
std::shared_ptr<CONNECTION_HANDLER> connection_handler,
std::chrono::seconds failure_detection_timeout,
std::chrono::milliseconds monitor_disposal_time,
DataSource* ds,
bool enable_logging)
: MONITOR(
std::move(host_info),
std::move(connection_handler),
failure_detection_timeout,
monitor_disposal_time,
ds,
nullptr,
enable_logging) {};
MONITOR::MONITOR(
std::shared_ptr<HOST_INFO> host_info,
std::shared_ptr<CONNECTION_HANDLER> connection_handler,
std::chrono::seconds failure_detection_timeout,
std::chrono::milliseconds monitor_disposal_time,
DataSource* ds,
CONNECTION_PROXY* proxy,
bool enable_logging) {
this->host = std::move(host_info);
this->connection_handler = std::move(connection_handler);
this->failure_detection_timeout = failure_detection_timeout;
this->disposal_time = monitor_disposal_time;
this->ds = new DataSource();
this->ds->copy(ds);
this->connection_proxy = proxy;
this->connection_check_interval = (std::chrono::milliseconds::max)();
if (enable_logging)
this->logger = init_log_file();
}
MONITOR::~MONITOR() {
if (this->ds) {
delete ds;
this->ds = nullptr;
}
if (this->connection_proxy) {
this->connection_proxy->delete_ds();
delete this->connection_proxy;
this->connection_proxy = nullptr;
}
}
void MONITOR::start_monitoring(std::shared_ptr<MONITOR_CONNECTION_CONTEXT> context) {
std::chrono::milliseconds detection_interval = context->get_failure_detection_interval();
if (detection_interval < this->connection_check_interval) {
this->connection_check_interval = detection_interval;
}
auto current_time = get_current_time();
context->set_start_monitor_time(current_time);
this->last_context_timestamp = current_time;
{
std::unique_lock<std::mutex> lock(mutex_);
this->contexts.push_back(context);
}
}
void MONITOR::stop_monitoring(std::shared_ptr<MONITOR_CONNECTION_CONTEXT> context) {
if (context == nullptr) {
MYLOG_TRACE(
this->logger, 0,
"[MONITOR] Invalid context passed into stop_monitoring()");
return;
}
context->invalidate();
{
std::unique_lock<std::mutex> lock(mutex_);
this->contexts.remove(context);
}
this->connection_check_interval = this->find_shortest_interval();
}
bool MONITOR::is_stopped() {
return this->stopped.load();
}
void MONITOR::stop() {
this->stopped.store(true);
}
void MONITOR::clear_contexts() {
{
std::unique_lock<std::mutex> lock(mutex_);
this->contexts.clear();
}
this->connection_check_interval = (std::chrono::milliseconds::max)();
}
// Periodically ping the server and update the contexts' connection status.
void MONITOR::run(std::shared_ptr<MONITOR_SERVICE> service) {
this->stopped = false;
while (!this->stopped) {
bool have_contexts;
{
std::unique_lock<std::mutex> lock(mutex_);
have_contexts = !this->contexts.empty();
}
if (have_contexts) {
auto status_check_start_time = this->get_current_time();
this->last_context_timestamp = status_check_start_time;
CONNECTION_STATUS status = this->check_connection_status();
{
std::unique_lock<std::mutex> lock(mutex_);
for (auto it = this->contexts.begin(); it != this->contexts.end(); ++it) {
std::shared_ptr<MONITOR_CONNECTION_CONTEXT> context = *it;
context->update_connection_status(
status_check_start_time,
status_check_start_time + status.elapsed_time,
status.is_valid);
}
}
std::chrono::milliseconds check_interval = this->get_connection_check_interval();
auto sleep_time = check_interval - status.elapsed_time;
if (sleep_time > std::chrono::milliseconds(0)) {
std::this_thread::sleep_for(sleep_time);
}
}
else {
auto time_inactive = std::chrono::duration_cast<std::chrono::milliseconds>(this->get_current_time() - this->last_context_timestamp);
if (time_inactive >= this->disposal_time) {
break;
}
std::this_thread::sleep_for(thread_sleep_when_inactive);
}
}
service->notify_unused(shared_from_this());
this->stopped = true;
}
std::chrono::milliseconds MONITOR::get_connection_check_interval() {
std::unique_lock<std::mutex> lock(mutex_);
if (this->contexts.empty()) {
return std::chrono::milliseconds(0);
}
return this->connection_check_interval;
}
CONNECTION_STATUS MONITOR::check_connection_status() {
if (this->connection_proxy == nullptr || !this->connection_proxy->is_connected()) {
const auto start = this->get_current_time();
bool connected = this->connect();
return CONNECTION_STATUS{
connected,
std::chrono::duration_cast<std::chrono::milliseconds>(this->get_current_time() - start)
};
}
auto start = this->get_current_time();
bool is_connection_active = this->connection_proxy->ping() == 0;
auto duration = this->get_current_time() - start;
return CONNECTION_STATUS{
is_connection_active,
std::chrono::duration_cast<std::chrono::milliseconds>(duration)
};
}
bool MONITOR::connect() {
if (this->connection_proxy) {
this->connection_proxy->close();
delete this->connection_proxy;
}
// Timeout shouldn't be 0 by now, but double check just in case
unsigned int timeout_sec = this->failure_detection_timeout.count() == 0 ? failure_detection_timeout_default : this->failure_detection_timeout.count();
// timeout should be set in DBC::connect()
if (this->ds->opt_ENABLE_CLUSTER_FAILOVER) {
this->ds->opt_CONNECT_TIMEOUT = timeout_sec;
this->ds->opt_NETWORK_TIMEOUT = timeout_sec;
} else {
// cannot change login_timeout here because no access to dbc
this->ds->opt_READTIMEOUT = timeout_sec;
this->ds->opt_WRITETIMEOUT = timeout_sec;
}
this->ds->opt_ENABLE_FAILURE_DETECTION= false;
this->connection_proxy = this->connection_handler->connect(this->host, this->ds, true);
if (!this->connection_proxy) {
return false;
}
return this->connection_proxy->is_connected();
}
std::chrono::milliseconds MONITOR::find_shortest_interval() {
auto min = (std::chrono::milliseconds::max)();
{
std::unique_lock<std::mutex> lock(mutex_);
if (this->contexts.empty()) {
return min;
}
for (auto it = this->contexts.begin(); it != this->contexts.end(); ++it) {
auto failure_detection_interval = (*it)->get_failure_detection_interval();
if (failure_detection_interval < min) {
min = failure_detection_interval;
}
}
}
return min;
}
std::chrono::steady_clock::time_point MONITOR::get_current_time() {
return std::chrono::steady_clock::now();
}