in src/limitless/limitless_monitor_service.cc [49:121]
bool LimitlessMonitorService::NewService(
std::string& service_id,
const SQLTCHAR *connection_string_c_str,
int host_port,
std::shared_ptr<LimitlessRouterMonitor> limitless_router_monitor
) {
std::lock_guard<std::mutex> services_guard(*(this->services_mutex));
// parse the connection string to extract useful limitless information
std::map<SQLSTR, SQLSTR> connection_string_map;
SQLSTR conn_str = StringHelper::ToSQLSTR(connection_string_c_str);
ConnectionStringHelper::ParseConnectionString(conn_str, connection_string_map);
if (service_id.empty()) {
auto it = connection_string_map.find(SERVER_HOST_KEY);
if (it != connection_string_map.end()) {
std::string host = StringHelper::ToString(connection_string_map[SERVER_HOST_KEY]);
service_id = RdsUtils::GetRdsClusterId(host);
if (service_id.empty()) {
service_id = std::to_string(std::chrono::steady_clock::now().time_since_epoch().count());
LOG(INFO) << "No service ID provided and could not parse service ID from host: " << host << ". Generated random service ID: " << service_id;
}
}
}
if (this->services.contains(service_id)) {
LOG(ERROR) << "Attempted to recreate existing monitor with service ID " << service_id;
return false;
}
bool block_and_query_immediately = true;
auto it = connection_string_map.find(LIMITLESS_MODE_KEY);
if (it != connection_string_map.end()) {
SQLSTR value = connection_string_map[LIMITLESS_MODE_KEY];
if (value == LIMITLESS_MODE_VALUE_LAZY) {
block_and_query_immediately = false;
}
}
unsigned int limitless_monitor_interval_ms = DEFAULT_LIMITLESS_MONITOR_INTERVAL_MS; // incase the field is unset
it = connection_string_map.find(LIMITLESS_MONITOR_INTERVAL_MS_KEY);
if (it != connection_string_map.end()) {
SQLSTR value = connection_string_map[LIMITLESS_MONITOR_INTERVAL_MS_KEY];
limitless_monitor_interval_ms = std::stoi(value);
}
// ensure that the owning scope of the shared pointer is inside the map
this->services[service_id] = std::make_shared<LimitlessMonitor>();
std::shared_ptr<LimitlessMonitor> service = this->services[service_id];
service->reference_counter = 1;
service->limitless_routers = std::make_shared<std::vector<HostInfo>>();
service->limitless_routers_mutex = std::make_shared<std::mutex>();
service->limitless_router_monitor = std::move(limitless_router_monitor);
// limitless_router_monitor is now nullptr
// start monitoring; this will block until the first set of limitless routers
// is retrieved or an error occurs if block_and_query_immediately is true
service->limitless_router_monitor->Open(
block_and_query_immediately,
connection_string_c_str,
host_port,
limitless_monitor_interval_ms,
service->limitless_routers,
service->limitless_routers_mutex
);
LOG(INFO) << "Started monitoring with service ID " << service_id;
return true;
}