driver/custom_endpoint_proxy.cc (111 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License, version 2.0
// (GPLv2), as published by the Free Software Foundation, with the
// following additional permissions:
//
// This program is distributed with certain software that is licensed
// under separate terms, as designated in a particular file or component
// or in the license documentation. Without limiting your rights under
// the GPLv2, the authors of this program hereby grant you an additional
// permission to link the program and your derivative works with the
// separately licensed software that they have included with the program.
//
// Without limiting the foregoing grant of rights under the GPLv2 and
// additional permission as to separately licensed software, this
// program is also subject to the Universal FOSS Exception, version 1.0,
// a copy of which can be found along with its FAQ at
// http://oss.oracle.com/licenses/universal-foss-exception.
//
// This program is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License, version 2.0, for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see
// http://www.gnu.org/licenses/gpl-2.0.html.
#include "custom_endpoint_proxy.h"
#include "mylog.h"
#include "rds_utils.h"
SLIDING_EXPIRATION_CACHE_WITH_CLEAN_UP_THREAD<std::string, std::shared_ptr<CUSTOM_ENDPOINT_MONITOR>>
CUSTOM_ENDPOINT_PROXY::monitors(std::make_shared<CUSTOM_ENDPOINTS_SHOULD_DISPOSE_FUNC>(),
std::make_shared<CUSTOM_ENDPOINTS_ITEM_DISPOSAL_FUNC>(), CACHE_CLEANUP_RATE_NANO);
bool CUSTOM_ENDPOINT_PROXY::is_monitor_cache_initialized(false);
CUSTOM_ENDPOINT_PROXY::CUSTOM_ENDPOINT_PROXY(DBC* dbc, DataSource* ds) : CUSTOM_ENDPOINT_PROXY(dbc, ds, nullptr) {}
CUSTOM_ENDPOINT_PROXY::CUSTOM_ENDPOINT_PROXY(DBC* dbc, DataSource* ds, CONNECTION_PROXY* next_proxy)
: CONNECTION_PROXY(dbc, ds) {
this->next_proxy = next_proxy;
this->topology_service = dbc->get_topology_service();
if (ds->opt_LOG_QUERY) {
this->logger = init_log_file();
}
this->should_wait_for_info = ds->opt_WAIT_FOR_CUSTOM_ENDPOINT_INFO;
this->wait_on_cached_info_duration_ms = ds->opt_WAIT_FOR_CUSTOM_ENDPOINT_INFO_TIMEOUT_MS;
this->idle_monitor_expiration_ms = ds->opt_CUSTOM_ENDPOINT_MONITOR_EXPIRATION_MS;
if (!is_monitor_cache_initialized) {
monitors.init_clean_up_thread();
is_monitor_cache_initialized = true;
}
}
bool CUSTOM_ENDPOINT_PROXY::connect(const char* host, const char* user, const char* password, const char* database,
unsigned int port, const char* socket, unsigned long flags) {
if (!RDS_UTILS::is_rds_custom_cluster_dns(host)) {
return this->next_proxy->connect(host, user, password, database, port, socket, flags);
}
this->custom_endpoint_host = host;
MYLOG_TRACE(this->logger, 0, "Detected a connection request to a custom endpoint URL: '%s'", host);
this->custom_endpoint_id = RDS_UTILS::get_rds_cluster_id(host);
if (this->custom_endpoint_id.empty()) {
this->set_custom_error_message("Unable to parse custom endpoint identifier from URL.");
return false;
}
this->region = ds->opt_CUSTOM_ENDPOINT_REGION ? static_cast<const char*>(ds->opt_CUSTOM_ENDPOINT_REGION)
: RDS_UTILS::get_rds_region(host);
if (this->region.empty()) {
this->set_custom_error_message(
"Unable to determine connection region. If you are using a non-standard RDS URL, please set the "
"'custom_endpoint_region' property");
return false;
}
const std::shared_ptr<CUSTOM_ENDPOINT_MONITOR> monitor = create_monitor_if_absent(ds);
if (this->should_wait_for_info) {
// If needed, wait a short time for custom endpoint info to be discovered.
this->wait_for_custom_endpoint_info(monitor);
}
return this->next_proxy->connect(host, user, password, database, port, socket, flags);
}
int CUSTOM_ENDPOINT_PROXY::query(const char* q) {
if (!this->custom_endpoint_host.empty()) {
const std::shared_ptr<CUSTOM_ENDPOINT_MONITOR> monitor = create_monitor_if_absent(ds);
if (this->should_wait_for_info) {
// If needed, wait a short time for custom endpoint info to be discovered.
this->wait_for_custom_endpoint_info(monitor);
}
}
return next_proxy->query(q);
}
int CUSTOM_ENDPOINT_PROXY::real_query(const char* q, unsigned long length) {
if (!this->custom_endpoint_host.empty()) {
const std::shared_ptr<CUSTOM_ENDPOINT_MONITOR> monitor = create_monitor_if_absent(ds);
if (this->should_wait_for_info) {
// If needed, wait a short time for custom endpoint info to be discovered.
this->wait_for_custom_endpoint_info(monitor);
}
}
return next_proxy->real_query(q, length);
}
void CUSTOM_ENDPOINT_PROXY::wait_for_custom_endpoint_info(std::shared_ptr<CUSTOM_ENDPOINT_MONITOR> monitor) {
bool has_custom_endpoint_info = monitor->has_custom_endpoint_info();
if (has_custom_endpoint_info) {
return;
}
// Wait for the monitor to place the custom endpoint info in the cache. This ensures other plugins get accurate
// custom endpoint info.
MYLOG_TRACE(this->logger, 0,
"Custom endpoint info for '%s' was not found. Waiting %dms for the endpoint monitor to fetch info...",
this->custom_endpoint_host.c_str(), this->wait_on_cached_info_duration_ms)
const auto wait_for_endpoint_info_timeout_nanos =
std::chrono::steady_clock::now() + std::chrono::milliseconds(this->wait_on_cached_info_duration_ms);
while (!has_custom_endpoint_info && std::chrono::steady_clock::now() < wait_for_endpoint_info_timeout_nanos) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
has_custom_endpoint_info = monitor->has_custom_endpoint_info();
}
if (!has_custom_endpoint_info) {
char buf[1024];
myodbc_snprintf(
buf, sizeof(buf),
"The custom endpoint plugin timed out after %ld ms while waiting for custom endpoint info for host %s.",
this->wait_on_cached_info_duration_ms, this->custom_endpoint_host.c_str());
set_custom_error_message(buf);
}
}
std::shared_ptr<CUSTOM_ENDPOINT_MONITOR> CUSTOM_ENDPOINT_PROXY::create_custom_endpoint_monitor(
const long long refresh_rate_nanos) {
return std::make_shared<CUSTOM_ENDPOINT_MONITOR>(this->topology_service, this->custom_endpoint_host,
this->custom_endpoint_id, this->region, refresh_rate_nanos,
this->dbc->env->custom_endpoint_thread_pool);
}
std::shared_ptr<CUSTOM_ENDPOINT_MONITOR> CUSTOM_ENDPOINT_PROXY::create_monitor_if_absent(DataSource* ds) {
const long long refresh_rate_nanos = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::milliseconds(ds->opt_CUSTOM_ENDPOINT_INFO_REFRESH_RATE_MS))
.count();
return monitors.compute_if_absent(
this->custom_endpoint_host,
[=](std::string key) { return this->create_custom_endpoint_monitor(refresh_rate_nanos); },
std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::milliseconds(this->idle_monitor_expiration_ms))
.count());
}
void CUSTOM_ENDPOINT_PROXY::release_resources() {
if (!monitors.empty()) {
monitors.release_resources();
}
}