in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java [111:225]
private synchronized void initializeSubsystem() {
if (!isInitialized) {
hBaseAccessor = new PhoenixHBaseAccessor(null);
// Initialize metadata
try {
metricMetadataManager = new TimelineMetricMetadataManager(hBaseAccessor);
} catch (MalformedURLException | URISyntaxException e) {
throw new ExceptionInInitializerError("Unable to initialize metadata manager");
}
metricMetadataManager.initializeMetadata();
// Initialize metric schema
hBaseAccessor.initMetricSchema();
// Initialize policies before TTL update
hBaseAccessor.initPoliciesAndTTL();
// Start HA service
// Start the controller
if (!configuration.isDistributedCollectorModeDisabled()) {
haController = new MetricCollectorHAController(configuration);
try {
haController.initializeHAController();
} catch (Exception e) {
LOG.error(e);
throw new MetricsSystemInitializationException("Unable to " +
"initialize HA controller", e);
}
} else {
LOG.info("Distributed collector mode disabled");
}
//Initialize whitelisting & blacklisting if needed
TimelineMetricsFilter.initializeMetricFilter(configuration);
Configuration metricsConf = null;
try {
metricsConf = configuration.getMetricsConf();
} catch (Exception e) {
throw new ExceptionInInitializerError("Cannot initialize configuration.");
}
if (configuration.isCollectorInMemoryAggregationEnabled()) {
try {
cache = startCacheNode();
} catch (Exception e) {
throw new MetricsSystemInitializationException("Unable to " +
"start cache node", e);
}
}
if (Boolean.parseBoolean(metricsConf.get(USE_GROUPBY_AGGREGATOR_QUERIES, "true"))) {
LOG.info("Using group by aggregators for aggregating host and cluster metrics.");
}
// Start the cluster aggregator second
TimelineMetricAggregator secondClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorSecond(
hBaseAccessor, metricsConf, metricMetadataManager, haController, cache);
scheduleAggregatorThread(secondClusterAggregator);
// Start the minute cluster aggregator
TimelineMetricAggregator minuteClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorMinute(
hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(minuteClusterAggregator);
// Start the hourly cluster aggregator
TimelineMetricAggregator hourlyClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorHourly(
hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(hourlyClusterAggregator);
// Start the daily cluster aggregator
TimelineMetricAggregator dailyClusterAggregator =
TimelineMetricAggregatorFactory.createTimelineClusterAggregatorDaily(
hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(dailyClusterAggregator);
// Start the minute host aggregator
if (!configuration.isHostInMemoryAggregationEnabled()) {
TimelineMetricAggregator minuteHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(minuteHostAggregator);
}
// Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorHourly(
hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(hourlyHostAggregator);
// Start the daily host aggregator
TimelineMetricAggregator dailyHostAggregator =
TimelineMetricAggregatorFactory.createTimelineMetricAggregatorDaily(
hBaseAccessor, metricsConf, metricMetadataManager, haController);
scheduleAggregatorThread(dailyHostAggregator);
if (!configuration.isTimelineMetricsServiceWatcherDisabled()) {
int initDelay = configuration.getTimelineMetricsServiceWatcherInitDelay();
int delay = configuration.getTimelineMetricsServiceWatcherDelay();
// Start the watchdog
watchdogExecutorService.scheduleWithFixedDelay(
new TimelineMetricStoreWatcher(this, configuration),
initDelay, delay, TimeUnit.SECONDS);
LOG.info("Started watchdog for timeline metrics store with initial " +
"delay = " + initDelay + ", delay = " + delay);
}
containerMetricsDisabled = configuration.isContainerMetricsDisabled();
defaultInstanceId = configuration.getDefaultInstanceId();
isInitialized = true;
}
}