private synchronized void initializeSubsystem()

in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java [111:225]


  private synchronized void initializeSubsystem() {
    if (!isInitialized) {
      hBaseAccessor = new PhoenixHBaseAccessor(null);

      // Initialize metadata
      try {
        metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
      } catch (MalformedURLException | URISyntaxException e) {
        throw new ExceptionInInitializerError("Unable to initialize metadata manager");
      }
      metricMetadataManager.initializeMetadata();

      // Initialize metric schema
      hBaseAccessor.initMetricSchema();

      // Initialize policies before TTL update
      hBaseAccessor.initPoliciesAndTTL();
      // Start HA service
      // Start the controller
      if (!configuration.isDistributedCollectorModeDisabled()) {
        haController = new MetricCollectorHAController(configuration);
        try {
          haController.initializeHAController();
        } catch (Exception e) {
          LOG.error(e);
          throw new MetricsSystemInitializationException("Unable to " +
            "initialize HA controller", e);
        }
      } else {
        LOG.info("Distributed collector mode disabled");
      }

      //Initialize whitelisting & blacklisting if needed
      TimelineMetricsFilter.initializeMetricFilter(configuration);

      Configuration metricsConf = null;
      try {
        metricsConf = configuration.getMetricsConf();
      } catch (Exception e) {
        throw new ExceptionInInitializerError("Cannot initialize configuration.");
      }

      if (configuration.isCollectorInMemoryAggregationEnabled()) {
        try {
          cache = startCacheNode();
        } catch (Exception e) {
          throw new MetricsSystemInitializationException("Unable to " +
            "start cache node", e);
        }
      }

      if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
        LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
      }

      // Start the cluster aggregator second
      TimelineMetricAggregator secondClusterAggregator =
        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
          hBaseAccessor, metricsConf, metricMetadataManager, haController, cache);
      scheduleAggregatorThread(secondClusterAggregator);

      // Start the minute cluster aggregator
      TimelineMetricAggregator minuteClusterAggregator =
        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
          hBaseAccessor, metricsConf, metricMetadataManager, haController);
      scheduleAggregatorThread(minuteClusterAggregator);

      // Start the hourly cluster aggregator
      TimelineMetricAggregator hourlyClusterAggregator =
        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
          hBaseAccessor, metricsConf, metricMetadataManager, haController);
      scheduleAggregatorThread(hourlyClusterAggregator);

      // Start the daily cluster aggregator
      TimelineMetricAggregator dailyClusterAggregator =
        TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
          hBaseAccessor, metricsConf, metricMetadataManager, haController);
      scheduleAggregatorThread(dailyClusterAggregator);

      // Start the minute host aggregator
      if (!configuration.isHostInMemoryAggregationEnabled()) {
        TimelineMetricAggregator minuteHostAggregator =
          TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
            hBaseAccessor, metricsConf, metricMetadataManager, haController);
        scheduleAggregatorThread(minuteHostAggregator);
      }

      // Start the hourly host aggregator
      TimelineMetricAggregator hourlyHostAggregator =
        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
          hBaseAccessor, metricsConf, metricMetadataManager, haController);
      scheduleAggregatorThread(hourlyHostAggregator);

      // Start the daily host aggregator
      TimelineMetricAggregator dailyHostAggregator =
        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
          hBaseAccessor, metricsConf, metricMetadataManager, haController);
      scheduleAggregatorThread(dailyHostAggregator);

      if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
        int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
        int delay = configuration.getTimelineMetricsServiceWatcherDelay();
        // Start the watchdog
        watchdogExecutorService.scheduleWithFixedDelay(
          new TimelineMetricStoreWatcher(this, configuration),
          initDelay, delay, TimeUnit.SECONDS);
        LOG.info("Started watchdog for timeline metrics store with initial " +
          "delay = " + initDelay + ", delay = " + delay);
      }
      containerMetricsDisabled = configuration.isContainerMetricsDisabled();
      defaultInstanceId = configuration.getDefaultInstanceId();
      isInitialized = true;
    }

  }