driver/efm_proxy.cc (256 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License, version 2.0
// (GPLv2), as published by the Free Software Foundation, with the
// following additional permissions:
//
// This program is distributed with certain software that is licensed
// under separate terms, as designated in a particular file or component
// or in the license documentation. Without limiting your rights under
// the GPLv2, the authors of this program hereby grant you an additional
// permission to link the program and your derivative works with the
// separately licensed software that they have included with the program.
//
// Without limiting the foregoing grant of rights under the GPLv2 and
// additional permission as to separately licensed software, this
// program is also subject to the Universal FOSS Exception, version 1.0,
// a copy of which can be found along with its FAQ at
// http://oss.oracle.com/licenses/universal-foss-exception.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License, version 2.0, for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see
// http://www.gnu.org/licenses/gpl-2.0.html.
#include "efm_proxy.h"
namespace {
const char* RETRIEVE_HOST_PORT_SQL = "SELECT CONCAT(@@hostname, ':', @@port)";
}
EFM_PROXY::EFM_PROXY(DBC* dbc, DataSource* ds) : EFM_PROXY(
dbc, ds, nullptr, std::make_shared<MONITOR_SERVICE>(ds && ds->opt_LOG_QUERY)) {}
EFM_PROXY::EFM_PROXY(DBC* dbc, DataSource* ds, CONNECTION_PROXY* next_proxy) : EFM_PROXY(
dbc, ds, next_proxy, std::make_shared<MONITOR_SERVICE>(ds && ds->opt_LOG_QUERY)) {}
EFM_PROXY::EFM_PROXY(DBC* dbc, DataSource* ds, CONNECTION_PROXY* next_proxy,
std::shared_ptr<MONITOR_SERVICE> monitor_service)
: CONNECTION_PROXY(dbc, ds),
monitor_service{std::move(monitor_service)} {
this->next_proxy = next_proxy;
}
std::shared_ptr<MONITOR_CONNECTION_CONTEXT> EFM_PROXY::start_monitoring() {
if (!ds || !ds->opt_ENABLE_FAILURE_DETECTION) {
return nullptr;
}
auto failure_detection_timeout = ds->opt_FAILURE_DETECTION_TIMEOUT;
// Use network timeout defined if failure detection timeout is not set
if (failure_detection_timeout == 0) {
failure_detection_timeout = ds->opt_NETWORK_TIMEOUT == 0 ? failure_detection_timeout_default : ds->opt_NETWORK_TIMEOUT;
}
return monitor_service->start_monitoring(
dbc,
ds,
node_keys,
std::make_shared<HOST_INFO>(get_host(), get_port()),
std::chrono::milliseconds{ds->opt_FAILURE_DETECTION_TIME},
std::chrono::seconds{failure_detection_timeout},
std::chrono::milliseconds{ds->opt_FAILURE_DETECTION_INTERVAL},
ds->opt_FAILURE_DETECTION_COUNT,
std::chrono::milliseconds{ds->opt_MONITOR_DISPOSAL_TIME});
}
void EFM_PROXY::stop_monitoring(std::shared_ptr<MONITOR_CONNECTION_CONTEXT> context) {
if (!ds || !ds->opt_ENABLE_FAILURE_DETECTION || context == nullptr) {
return;
}
monitor_service->stop_monitoring(context);
if (context->is_node_unhealthy() && is_connected()) {
close_socket();
}
}
void EFM_PROXY::generate_node_keys() {
node_keys.clear();
node_keys.insert(std::string(get_host()) + ":" + std::to_string(get_port()));
if (is_connected()) {
// Temporarily turn off failure detection if on
const auto failure_detection_old_state = ds->opt_ENABLE_FAILURE_DETECTION;
ds->opt_ENABLE_FAILURE_DETECTION = false;
const auto error = query(RETRIEVE_HOST_PORT_SQL);
if (error == 0) {
MYSQL_RES* result = store_result();
MYSQL_ROW row;
while ((row = fetch_row(result))) {
node_keys.insert(std::string(row[0]));
}
free_result(result);
}
ds->opt_ENABLE_FAILURE_DETECTION = failure_detection_old_state;
}
}
void EFM_PROXY::set_next_proxy(CONNECTION_PROXY* next_proxy) {
CONNECTION_PROXY::set_next_proxy(next_proxy);
generate_node_keys();
}
void EFM_PROXY::set_connection(CONNECTION_PROXY* connection_proxy) {
CONNECTION_PROXY::set_connection(connection_proxy);
if (monitor_service != nullptr && !node_keys.empty()) {
monitor_service->stop_monitoring_for_all_connections(node_keys);
}
generate_node_keys();
}
int EFM_PROXY::set_character_set(const char* csname) {
const auto context = start_monitoring();
const int ret = next_proxy->set_character_set(csname);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::change_user(const char* user, const char* passwd, const char* db) {
const auto context = start_monitoring();
const bool ret = next_proxy->change_user(user, passwd, db);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::real_connect(
const char* host, const char* user, const char* passwd,
const char* db, unsigned int port, const char* unix_socket,
unsigned long clientflag) {
const bool ret = next_proxy->real_connect(host, user, passwd, db, port, unix_socket, clientflag);
generate_node_keys();
return ret;
}
int EFM_PROXY::select_db(const char* db) {
const auto context = start_monitoring();
const int ret = next_proxy->select_db(db);
stop_monitoring(context);
return ret;
}
int EFM_PROXY::query(const char* q) {
const auto context = start_monitoring();
const int ret = next_proxy->query(q);
stop_monitoring(context);
return ret;
}
int EFM_PROXY::real_query(const char* q, unsigned long length) {
const auto context = start_monitoring();
const int ret = next_proxy->real_query(q, length);
stop_monitoring(context);
return ret;
}
MYSQL_RES* EFM_PROXY::store_result() {
const auto context = start_monitoring();
MYSQL_RES* ret = next_proxy->store_result();
stop_monitoring(context);
return ret;
}
MYSQL_RES* EFM_PROXY::use_result() {
const auto context = start_monitoring();
MYSQL_RES* ret = next_proxy->use_result();
stop_monitoring(context);
return ret;
}
int EFM_PROXY::next_result() {
const auto context = start_monitoring();
const int ret = next_proxy->next_result();
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::more_results() {
const auto context = start_monitoring();
const bool ret = next_proxy->more_results();
stop_monitoring(context);
return ret;
}
int EFM_PROXY::stmt_next_result(MYSQL_STMT* stmt) {
const auto context = start_monitoring();
const int ret = next_proxy->stmt_next_result(stmt);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::real_connect_dns_srv(
const char* dns_srv_name, const char* user,
const char* passwd, const char* db, unsigned long client_flag) {
const bool ret = next_proxy->real_connect_dns_srv(dns_srv_name, user, passwd, db, client_flag);
generate_node_keys();
return ret;
}
void EFM_PROXY::free_result(MYSQL_RES* result) {
const auto context = start_monitoring();
next_proxy->free_result(result);
stop_monitoring(context);
}
MYSQL_ROW EFM_PROXY::fetch_row(MYSQL_RES* result) {
const auto context = start_monitoring();
const MYSQL_ROW ret = next_proxy->fetch_row(result);
stop_monitoring(context);
return ret;
}
unsigned long EFM_PROXY::real_escape_string(char* to, const char* from, unsigned long length) {
const auto context = start_monitoring();
const unsigned long ret = next_proxy->real_escape_string(to, from, length);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::bind_param(unsigned n_params, MYSQL_BIND* binds, const char** names) {
const auto context = start_monitoring();
const bool ret = next_proxy->bind_param(n_params, binds, names);
stop_monitoring(context);
return ret;
}
MYSQL_STMT* EFM_PROXY::stmt_init() {
const auto context = start_monitoring();
MYSQL_STMT* ret = next_proxy->stmt_init();
stop_monitoring(context);
return ret;
}
int EFM_PROXY::stmt_prepare(MYSQL_STMT* stmt, const char* query, unsigned long length) {
const auto context = start_monitoring();
const int ret = next_proxy->stmt_prepare(stmt, query, length);
stop_monitoring(context);
return ret;
}
int EFM_PROXY::stmt_execute(MYSQL_STMT* stmt) {
const auto context = start_monitoring();
const int ret = next_proxy->stmt_execute(stmt);
stop_monitoring(context);
return ret;
}
int EFM_PROXY::stmt_fetch(MYSQL_STMT* stmt) {
const auto context = start_monitoring();
const int ret = next_proxy->stmt_fetch(stmt);
stop_monitoring(context);
return ret;
}
int EFM_PROXY::stmt_fetch_column(MYSQL_STMT* stmt, MYSQL_BIND* bind_arg, unsigned int column, unsigned long offset) {
const auto context = start_monitoring();
const int ret = next_proxy->stmt_fetch_column(stmt, bind_arg, column, offset);
stop_monitoring(context);
return ret;
}
int EFM_PROXY::stmt_store_result(MYSQL_STMT* stmt) {
const auto context = start_monitoring();
const int ret = next_proxy->stmt_store_result(stmt);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::stmt_bind_named_param(MYSQL_STMT *stmt, MYSQL_BIND *binds,
unsigned n_params, const char **names) {
const auto context = start_monitoring();
const bool ret =
next_proxy->stmt_bind_named_param(stmt, binds, n_params, names);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::stmt_bind_param(MYSQL_STMT* stmt, MYSQL_BIND* bnd) {
const auto context = start_monitoring();
const bool ret = next_proxy->stmt_bind_param(stmt, bnd);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::stmt_bind_result(MYSQL_STMT* stmt, MYSQL_BIND* bnd) {
const auto context = start_monitoring();
const bool ret = next_proxy->stmt_bind_result(stmt, bnd);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::stmt_close(MYSQL_STMT* stmt) {
const auto context = start_monitoring();
const bool ret = next_proxy->stmt_close(stmt);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::stmt_reset(MYSQL_STMT* stmt) {
const auto context = start_monitoring();
const bool ret = next_proxy->stmt_reset(stmt);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::stmt_free_result(MYSQL_STMT* stmt) {
const auto context = start_monitoring();
const bool ret = next_proxy->stmt_free_result(stmt);
stop_monitoring(context);
return ret;
}
bool EFM_PROXY::stmt_send_long_data(MYSQL_STMT* stmt, unsigned int param_number, const char* data,
unsigned long length) {
const auto context = start_monitoring();
const bool ret = next_proxy->stmt_send_long_data(stmt, param_number, data, length);
stop_monitoring(context);
return ret;
}
MYSQL_RES* EFM_PROXY::stmt_result_metadata(MYSQL_STMT* stmt) {
const auto context = start_monitoring();
MYSQL_RES* ret = next_proxy->stmt_result_metadata(stmt);
stop_monitoring(context);
return ret;
}