in common/lib/plugins/custom_endpoint/custom_endpoint_monitor_impl.ts [77:164]
async run(): Promise<void> {
logger.verbose(Messages.get("CustomEndpointMonitorImpl.startingMonitor", this.customEndpointHostInfo.host));
while (!this.stop) {
try {
const start = Date.now();
const input = {
DBClusterEndpointIdentifier: this.endpointIdentifier,
Filters: [
{
Name: "db-cluster-endpoint-type",
Values: ["custom"]
}
]
};
const command = new DescribeDBClusterEndpointsCommand(input);
const result = await this.rdsClient.send(command);
const endpoints = result.DBClusterEndpoints;
if (endpoints.length === 0) {
throw new AwsWrapperError(Messages.get("CustomEndpointMonitorImpl.noEndpoints"));
}
if (endpoints.length !== 1) {
let endpointUrls = "";
endpoints.forEach((endpoint) => {
endpointUrls += `\n\t${endpoint.Endpoint}`;
});
logger.warn(
Messages.get(
"CustomEndpointMonitorImpl.unexpectedNumberOfEndpoints",
this.endpointIdentifier,
this.region,
String(endpoints.length),
endpointUrls
)
);
await new Promise((resolve) => {
this.timers.push(setTimeout(resolve, this.refreshRateMs));
});
continue;
}
const endpointInfo = CustomEndpointInfo.fromDbClusterEndpoint(endpoints[0]);
const cachedEndpointInfo = CustomEndpointMonitorImpl.customEndpointInfoCache.get(this.customEndpointHostInfo.host);
if (cachedEndpointInfo && cachedEndpointInfo.equals(endpointInfo)) {
const elapsedTime = Date.now() - start;
const sleepDuration = Math.max(0, this.refreshRateMs - elapsedTime);
await new Promise((resolve) => {
this.timers.push(setTimeout(resolve, sleepDuration));
});
continue;
}
logger.verbose(
Messages.get("CustomEndpointMonitorImpl.detectedChangeInCustomEndpointInfo", this.customEndpointHostInfo.host, endpointInfo.toString())
);
// The custom endpoint info has changed, so we need to update the set of allowed/blocked hosts.
let allowedAndBlockedHosts: AllowedAndBlockedHosts;
if (endpointInfo.getMemberListType() === MemberListType.STATIC_LIST) {
allowedAndBlockedHosts = new AllowedAndBlockedHosts(endpointInfo.getStaticMembers(), null);
} else {
allowedAndBlockedHosts = new AllowedAndBlockedHosts(null, endpointInfo.getExcludedMembers());
}
this.pluginService.setAllowedAndBlockedHosts(allowedAndBlockedHosts);
CustomEndpointMonitorImpl.customEndpointInfoCache.put(
this.customEndpointHostInfo.host,
endpointInfo,
CustomEndpointMonitorImpl.CUSTOM_ENDPOINT_INFO_EXPIRATION_NANO
);
this.infoChangedCounter.inc();
const elapsedTime = Date.now() - start;
const sleepDuration = Math.max(0, this.refreshRateMs - elapsedTime);
await new Promise((resolve) => {
this.timers.push(setTimeout(resolve, sleepDuration));
});
} catch (e: any) {
logger.error(Messages.get("CustomEndpointMonitorImpl.error", this.customEndpointHostInfo.host, e.message));
throw e;
}
}
}