public TimelineMetricsIgniteCache()

in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricsIgniteCache.java [87:173]


  public TimelineMetricsIgniteCache(TimelineMetricMetadataManager metricMetadataManager) throws MalformedURLException, URISyntaxException {
    TimelineMetricConfiguration timelineMetricConfiguration = TimelineMetricConfiguration.getInstance();
    Configuration metricConf = timelineMetricConfiguration.getMetricsConf();
    Configuration sslConf = timelineMetricConfiguration.getMetricsSslConf();

    IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
    this.metricMetadataManager = metricMetadataManager;

    //TODO add config to disable logging

    //enable ssl for ignite requests
    if (metricConf.get(TIMELINE_SERVICE_HTTP_POLICY) != null && metricConf.get(TIMELINE_SERVICE_HTTP_POLICY).equalsIgnoreCase("HTTPS_ONLY")) {
      SslContextFactory sslContextFactory = new SslContextFactory();
      String keyStorePath = sslConf.get("ssl.server.keystore.location");
      String keyStorePassword = sslConf.get("ssl.server.keystore.password");
      String trustStorePath = sslConf.get("ssl.server.truststore.location");
      String trustStorePassword = sslConf.get("ssl.server.truststore.password");

      sslContextFactory.setKeyStoreFilePath(keyStorePath);
      sslContextFactory.setKeyStorePassword(keyStorePassword.toCharArray());
      sslContextFactory.setTrustStoreFilePath(trustStorePath);
      sslContextFactory.setTrustStorePassword(trustStorePassword.toCharArray());
      igniteConfiguration.setSslContextFactory(sslContextFactory);
    }

    //aggregation parameters
    appIdsToAggregate = timelineMetricConfiguration.getAppIdsForHostAggregation();
    interpolationEnabled = Boolean.parseBoolean(metricConf.get(TIMELINE_METRICS_CLUSTER_AGGREGATOR_INTERPOLATION_ENABLED, "true"));
    cacheSliceIntervalMillis = SECONDS.toMillis(metricConf.getInt(CLUSTER_AGGREGATOR_TIMESLICE_INTERVAL, 30));
    Long aggregationInterval = metricConf.getLong(CLUSTER_AGGREGATOR_SECOND_SLEEP_INTERVAL, 120L);

    // Skip aggregation for metrics for which aggregating across hosts does not make sense.
    String filteredMetricPatterns = metricConf.get(TIMELINE_METRIC_AGGREGATION_SQL_FILTERS);
    if (StringUtils.isNotEmpty(filteredMetricPatterns)) {
      LOG.info("Skipping in memory cluster aggregation for metric patterns : " + filteredMetricPatterns);
      skipAggrPatternStrings.addAll(getJavaMetricPatterns(filteredMetricPatterns));
    }

    // Skip aggregation for those metrics that are meant to be of high volume and get differential treatment.
    String transientMetricPatterns = timelineMetricConfiguration.getTransientMetricPatterns();
    if (StringUtils.isNotEmpty(transientMetricPatterns)) {
      LOG.info("Skipping in memory cluster aggregation for transient metric patterns : " + transientMetricPatterns);
      skipAggrPatternStrings.addAll(getJavaMetricPatterns(transientMetricPatterns));
    }

    if (metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES) != null) {
      TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
      TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
      ipFinder.setAddresses(Arrays.asList(metricConf.get(TIMELINE_METRICS_COLLECTOR_IGNITE_NODES).split(",")));
      LOG.info("Setting ignite nodes to : " + ipFinder.getRegisteredAddresses());
      discoverySpi.setIpFinder(ipFinder);
      igniteConfiguration.setDiscoverySpi(discoverySpi);
    } else {
      //get live nodes from ZK
      String zkClientPort = timelineMetricConfiguration.getClusterZKClientPort();
      String zkQuorum = timelineMetricConfiguration.getClusterZKQuorum();
      String zkConnectionURL = timelineMetricConfiguration.getZkConnectionUrl(zkClientPort, zkQuorum);
      MetricCollectorHAHelper metricCollectorHAHelper = new MetricCollectorHAHelper(zkConnectionURL, 5, 200);
      Collection<String> liveCollectors = metricCollectorHAHelper.findLiveCollectorHostsFromZNode();
      if (liveCollectors != null && !liveCollectors.isEmpty()) {
        TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi();
        TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
        ipFinder.setAddresses(liveCollectors);
        LOG.info("Setting ignite nodes to : " + ipFinder.getRegisteredAddresses());
        discoverySpi.setIpFinder(ipFinder);
        igniteConfiguration.setDiscoverySpi(discoverySpi);
      }
    }


    //ignite cache configuration
    CacheConfiguration<TimelineClusterMetric, MetricClusterAggregate> cacheConfiguration = new CacheConfiguration<>();
    cacheConfiguration.setName("metrics_cache");
    //set cache mode to partitioned with # of backups
    cacheConfiguration.setCacheMode(CacheMode.PARTITIONED);
    cacheConfiguration.setBackups(metricConf.getInt(TIMELINE_METRICS_COLLECTOR_IGNITE_BACKUPS, 1));
    //disable throttling due to cpu impact
    cacheConfiguration.setRebalanceThrottle(0);
    //enable locks
    cacheConfiguration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
    //expiry policy to remove lost keys, if any
    cacheConfiguration.setEagerTtl(true);
    cacheConfiguration.setExpiryPolicyFactory(CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, aggregationInterval * 3)));

    Ignite igniteNode = Ignition.start(igniteConfiguration);
    igniteCache = igniteNode.getOrCreateCache(cacheConfiguration);
  }