async run()

in common/lib/plugins/custom_endpoint/custom_endpoint_monitor_impl.ts [77:164]


  async run(): Promise<void> {
    logger.verbose(Messages.get("CustomEndpointMonitorImpl.startingMonitor", this.customEndpointHostInfo.host));

    while (!this.stop) {
      try {
        const start = Date.now();

        const input = {
          DBClusterEndpointIdentifier: this.endpointIdentifier,
          Filters: [
            {
              Name: "db-cluster-endpoint-type",
              Values: ["custom"]
            }
          ]
        };
        const command = new DescribeDBClusterEndpointsCommand(input);
        const result = await this.rdsClient.send(command);

        const endpoints = result.DBClusterEndpoints;

        if (endpoints.length === 0) {
          throw new AwsWrapperError(Messages.get("CustomEndpointMonitorImpl.noEndpoints"));
        }

        if (endpoints.length !== 1) {
          let endpointUrls = "";
          endpoints.forEach((endpoint) => {
            endpointUrls += `\n\t${endpoint.Endpoint}`;
          });
          logger.warn(
            Messages.get(
              "CustomEndpointMonitorImpl.unexpectedNumberOfEndpoints",
              this.endpointIdentifier,
              this.region,
              String(endpoints.length),
              endpointUrls
            )
          );
          await new Promise((resolve) => {
            this.timers.push(setTimeout(resolve, this.refreshRateMs));
          });
          continue;
        }

        const endpointInfo = CustomEndpointInfo.fromDbClusterEndpoint(endpoints[0]);
        const cachedEndpointInfo = CustomEndpointMonitorImpl.customEndpointInfoCache.get(this.customEndpointHostInfo.host);

        if (cachedEndpointInfo && cachedEndpointInfo.equals(endpointInfo)) {
          const elapsedTime = Date.now() - start;
          const sleepDuration = Math.max(0, this.refreshRateMs - elapsedTime);
          await new Promise((resolve) => {
            this.timers.push(setTimeout(resolve, sleepDuration));
          });
          continue;
        }

        logger.verbose(
          Messages.get("CustomEndpointMonitorImpl.detectedChangeInCustomEndpointInfo", this.customEndpointHostInfo.host, endpointInfo.toString())
        );

        // The custom endpoint info has changed, so we need to update the set of allowed/blocked hosts.
        let allowedAndBlockedHosts: AllowedAndBlockedHosts;
        if (endpointInfo.getMemberListType() === MemberListType.STATIC_LIST) {
          allowedAndBlockedHosts = new AllowedAndBlockedHosts(endpointInfo.getStaticMembers(), null);
        } else {
          allowedAndBlockedHosts = new AllowedAndBlockedHosts(null, endpointInfo.getExcludedMembers());
        }

        this.pluginService.setAllowedAndBlockedHosts(allowedAndBlockedHosts);
        CustomEndpointMonitorImpl.customEndpointInfoCache.put(
          this.customEndpointHostInfo.host,
          endpointInfo,
          CustomEndpointMonitorImpl.CUSTOM_ENDPOINT_INFO_EXPIRATION_NANO
        );
        this.infoChangedCounter.inc();

        const elapsedTime = Date.now() - start;
        const sleepDuration = Math.max(0, this.refreshRateMs - elapsedTime);
        await new Promise((resolve) => {
          this.timers.push(setTimeout(resolve, sleepDuration));
        });
      } catch (e: any) {
        logger.error(Messages.get("CustomEndpointMonitorImpl.error", this.customEndpointHostInfo.host, e.message));
        throw e;
      }
    }
  }