in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java [87:173]
public TimelineMetricsIgniteCache(TimelineMetricMetadataManager metricMetadataManager) throws MalformedURLException, URISyntaxException {
TimelineMetricConfiguration timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
Configuration metricConf = timelineMetricConfiguration.getMetricsConf();
Configuration sslConf = timelineMetricConfiguration.getMetricsSslConf();
IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
this.metricMetadataManager = metricMetadataManager;
//TODO add config to disable logging
//enable ssl for ignite requests
if (metricConf.get(TIMELINE_SERVICE_HTTP_POLICY) != null && metricConf.get(TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY")) {
SslContextFactory sslContextFactory = new SslContextFactory();
String keyStorePath = sslConf.get("ssl.server.keystore.location");
String keyStorePassword = sslConf.get("ssl.server.keystore.password");
String trustStorePath = sslConf.get("ssl.server.truststore.location");
String trustStorePassword = sslConf.get("ssl.server.truststore.password");
sslContextFactory.setKeyStoreFilePath(keyStorePath);
sslContextFactory.setKeyStorePassword(keyStorePassword.toCharArray());
sslContextFactory.setTrustStoreFilePath(trustStorePath);
sslContextFactory.setTrustStorePassword(trustStorePassword.toCharArray());
igniteConfiguration.setSslContextFactory(sslContextFactory);
}
//aggregation parameters
appIdsToAggregate = timelineMetricConfiguration.getAppIdsForHostAggregation();
interpolationEnabled = Boolean.parseBoolean(metricConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
Long aggregationInterval = metricConf.getLong(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L);
// Skip aggregation for metrics for which aggregating across hosts does not make sense.
String filteredMetricPatterns = metricConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
if (StringUtils.isNotEmpty(filteredMetricPatterns)) {
LOG.info("Skipping in memory cluster aggregation for metric patterns : " + filteredMetricPatterns);
skipAggrPatternStrings.addAll(getJavaMetricPatterns(filteredMetricPatterns));
}
// Skip aggregation for those metrics that are meant to be of high volume and get differential treatment.
String transientMetricPatterns = timelineMetricConfiguration.getTransientMetricPatterns();
if (StringUtils.isNotEmpty(transientMetricPatterns)) {
LOG.info("Skipping in memory cluster aggregation for transient metric patterns : " + transientMetricPatterns);
skipAggrPatternStrings.addAll(getJavaMetricPatterns(transientMetricPatterns));
}
if (metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES) != null) {
TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(Arrays.asList(metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(",")));
LOG.info("Setting ignite nodes to : " + ipFinder.getRegisteredAddresses());
discoverySpi.setIpFinder(ipFinder);
igniteConfiguration.setDiscoverySpi(discoverySpi);
} else {
//get live nodes from ZK
String zkClientPort = timelineMetricConfiguration.getClusterZKClientPort();
String zkQuorum = timelineMetricConfiguration.getClusterZKQuorum();
String zkConnectionURL = timelineMetricConfiguration.getZkConnectionUrl(zkClientPort, zkQuorum);
MetricCollectorHAHelper metricCollectorHAHelper = new MetricCollectorHAHelper(zkConnectionURL, 5, 200);
Collection<String> liveCollectors = metricCollectorHAHelper.findLiveCollectorHostsFromZNode();
if (liveCollectors != null && !liveCollectors.isEmpty()) {
TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
ipFinder.setAddresses(liveCollectors);
LOG.info("Setting ignite nodes to : " + ipFinder.getRegisteredAddresses());
discoverySpi.setIpFinder(ipFinder);
igniteConfiguration.setDiscoverySpi(discoverySpi);
}
}
//ignite cache configuration
CacheConfiguration<TimelineClusterMetric, MetricClusterAggregate> cacheConfiguration = new CacheConfiguration<>();
cacheConfiguration.setName("metrics_cache");
//set cache mode to partitioned with # of backups
cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
cacheConfiguration.setBackups(metricConf.getInt(TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS, 1));
//disable throttling due to cpu impact
cacheConfiguration.setRebalanceThrottle(0);
//enable locks
cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
//expiry policy to remove lost keys, if any
cacheConfiguration.setEagerTtl(true);
cacheConfiguration.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, aggregationInterval * 3)));
Ignite igniteNode = Ignition.start(igniteConfiguration);
igniteCache = igniteNode.getOrCreateCache(cacheConfiguration);
}