in src/failover/failover_service.cc [415:485]
bool StartFailoverService(char* service_id_c_str, DatabaseDialect dialect, const SQLTCHAR* conn_cstr) {
std::string cluster_id(service_id_c_str);
std::shared_ptr<Dialect> dialect_obj;
switch (dialect) {
case AURORA_POSTGRES:
dialect_obj = std::make_shared<DialectAuroraPostgres>();
break;
default:
LOG(ERROR) << "[Failover Service] for: " << cluster_id << ", does not accept this dialect.";
return false;
}
std::map<SQLSTR, SQLSTR> conn_info;
ConnectionStringHelper::ParseConnectionString(StringHelper::ToSQLSTR(conn_cstr), conn_info);
// Simple wide to narrow conversion for endpoint template when unicode
// RDS endpoints are alphanumeric only
std::string endpoint_template = StringHelper::ToString(conn_info[ENDPOINT_TEMPLATE_KEY]);
std::string host = StringHelper::ToString(conn_info[SERVER_HOST_KEY]);
std::shared_ptr<std::map<SQLSTR, SQLSTR>> conn_info_ptr = std::make_shared<std::map<SQLSTR, SQLSTR>>(conn_info);
conn_info[ENABLE_FAILOVER_KEY] = BOOL_FALSE;
SQLSTR updated_conn_str = ConnectionStringHelper::BuildConnectionString(conn_info);
if (endpoint_template.empty()) {
endpoint_template = RdsUtils::GetRdsInstanceHostPattern(host);
}
if (cluster_id.empty()) {
cluster_id = RdsUtils::GetRdsClusterId(host);
if (cluster_id.empty()) {
cluster_id = std::to_string(std::chrono::steady_clock::now().time_since_epoch().count());
LOG(INFO) << "Unable to parse ClusterId from host: " << host << ". Generated random ClusterId: " << cluster_id;
} else {
LOG(INFO) << "[Failover Service] Generated ClusterId: " << cluster_id << " from host: " << host;
}
conn_info_ptr->insert_or_assign(CLUSTER_ID_KEY, StringHelper::ToSQLSTR(cluster_id));
// If the original input was empty, copy the generated ID back to caller
strncpy(service_id_c_str, cluster_id.c_str(), MAX_CLUSTER_ID_LEN);
}
std::shared_ptr<FailoverServiceTracker> tracker;
try {
uint32_t ignore_topology_request_ms = parse_num(conn_info[IGNORE_TOPOLOGY_REQUEST_KEY], FailoverService::DEFAULT_IGNORE_TOPOLOGY_REQUEST_MS);
uint32_t high_refresh_rate_ms = parse_num(conn_info[HIGH_REFRESH_RATE_KEY], FailoverService::DEFAULT_HIGH_REFRESH_RATE_MS);
uint32_t refresh_rate_ms = parse_num(conn_info[REFRESH_RATE_KEY], FailoverService::DEFAULT_REFRESH_RATE_MS);
if (!FailoverServiceTrackerHandler::Contains(cluster_id)) {
tracker = std::make_shared<FailoverServiceTracker>();
tracker->reference_count = 1;
tracker->service = std::make_shared<FailoverService>(
host, cluster_id, dialect_obj, conn_info_ptr, global_topology_map,
std::make_shared<ClusterTopologyMonitor>(
cluster_id, global_topology_map, AS_SQLTCHAR(updated_conn_str.c_str()), std::make_shared<OdbcHelperWrapper>(),
std::make_shared<ClusterTopologyQueryHelper>(dialect_obj->GetDefaultPort(), endpoint_template,
dialect_obj->GetTopologyQuery(), dialect_obj->GetWriterIdQuery(),
dialect_obj->GetNodeIdQuery()),
ignore_topology_request_ms, high_refresh_rate_ms, refresh_rate_ms),
std::make_shared<OdbcHelperWrapper>());
// Check again to see if the other thread has set service tracker for cluster id
// If still empty, put new tracker. Let tracker descope and free itself otherwise
FailoverServiceTrackerHandler::PutIfAbsent(cluster_id, tracker);
} else {
FailoverServiceTrackerHandler::Increment(cluster_id);
}
} catch (const std::exception& ex) {
LOG(ERROR) << "Failed to create Failover Service for: " << cluster_id << ". " << ex.what();
return false;
}
return true;
}