in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/TimelineMetricAggregatorFactory.java [257:315]
public static TimelineMetricAggregator createTimelineClusterAggregatorSecond(
PhoenixHBaseAccessor hBaseAccessor, Configuration metricsConf,
TimelineMetricMetadataManager metadataManager,
MetricCollectorHAController haController,
TimelineMetricDistributedCache distributedCache) {
String checkpointDir = metricsConf.get(
TIMELINE_METRICS_AGGREGATOR_CHECKPOINT_DIR, DEFAULT_CHECKPOINT_LOCATION);
String checkpointLocation = FilenameUtils.concat(checkpointDir,
CLUSTER_AGGREGATOR_CHECKPOINT_FILE);
long sleepIntervalMillis = SECONDS.toMillis(metricsConf.getLong
(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120l));
long timeSliceIntervalMillis = SECONDS.toMillis(metricsConf.getInt
(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
int checkpointCutOffMultiplier =
metricsConf.getInt(CLUSTER_AGGREGATOR_SECOND_CHECKPOINT_CUTOFF_MULTIPLIER, 2);
String outputTableName = METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
String aggregatorDisabledParam = CLUSTER_AGGREGATOR_SECOND_DISABLED;
// Second based aggregation have added responsibility of time slicing
if (TimelineMetricConfiguration.getInstance().isCollectorInMemoryAggregationEnabled()) {
return new TimelineMetricClusterAggregatorSecondWithCacheSource(
METRIC_AGGREGATE_SECOND,
metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
checkpointCutOffMultiplier,
aggregatorDisabledParam,
null,
outputTableName,
120000l,
timeSliceIntervalMillis,
haController,
distributedCache
);
}
String inputTableName = METRICS_RECORD_TABLE_NAME;
return new TimelineMetricClusterAggregatorSecond(
METRIC_AGGREGATE_SECOND,
metadataManager,
hBaseAccessor, metricsConf,
checkpointLocation,
sleepIntervalMillis,
checkpointCutOffMultiplier,
aggregatorDisabledParam,
inputTableName,
outputTableName,
120000l,
timeSliceIntervalMillis,
haController
);
}