public void init()

in ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java [160:234]


  public void init(VerifiableProperties props) {
    synchronized (lock) {
      if (!initialized) {
        LOG.info("Initializing Kafka Timeline Metrics Sink");
        try {
          hostname = InetAddress.getLocalHost().getHostName();
          //If not FQDN , call  DNS
          if ((hostname == null) || (!hostname.contains("."))) {
            hostname = InetAddress.getLocalHost().getCanonicalHostName();
          }
        } catch (UnknownHostException e) {
          LOG.error("Could not identify hostname.");
          throw new RuntimeException("Could not identify hostname.", e);
        }
        // Initialize the collector write strategy
        super.init();

        KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
        timeoutSeconds = props.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS);
        int metricsSendInterval = props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS);
        int maxRowCacheSize = props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, MAX_RECS_PER_NAME_DEFAULT);

        zookeeperQuorum = props.containsKey(COLLECTOR_ZOOKEEPER_QUORUM) ?
          props.getString(COLLECTOR_ZOOKEEPER_QUORUM) : props.getString("zookeeper.connect");

        metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
        collectorHosts = parseHostsStringIntoCollection(props.getString(TIMELINE_HOSTS_PROPERTY, TIMELINE_DEFAULT_HOST));
        metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);

        instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY, null);
        setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY, false);

        hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
        hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
        hostInMemoryAggregationProtocol = props.getString(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
        setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));

        if (metricCollectorProtocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
          String trustStorePath = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY).trim();
          String trustStoreType = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY).trim();
          String trustStorePwd = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
          loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
        }

        // Exclusion policy
        String excludedMetricsStr = props.getString(EXCLUDED_METRICS_PROPERTY, "");
        if (!StringUtils.isEmpty(excludedMetricsStr.trim())) {
          excludedMetricsPrefixes = excludedMetricsStr.trim().split(",");
        }
        // Inclusion override
        String includedMetricsStr = props.getString(INCLUDED_METRICS_PROPERTY, "");
        if (!StringUtils.isEmpty(includedMetricsStr.trim())) {
          includedMetricsPrefixes = includedMetricsStr.trim().split(",");
        }

        // Inclusion override
        String includedMetricsRegexStr = props.getString(INCLUDED_METRICS_REGEX_PROPERTY, "");
        if (!StringUtils.isEmpty(includedMetricsRegexStr.trim())) {
          LOG.info("Including metrics which match the following regex patterns : " + includedMetricsRegexStr);
          includedMetricsRegex = includedMetricsRegexStr.trim().split(",");
        }

        initializeReporter();
        if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
          startReporter(metricsConfig.pollingIntervalSecs());
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug("MetricsSendInterval = " + metricsSendInterval);
          LOG.debug("MaxRowCacheSize = " + maxRowCacheSize);
          LOG.debug("Excluded metrics prefixes = " + excludedMetricsStr);
          LOG.debug("Included metrics prefixes = " + includedMetricsStr);
        }
      }
    }
  }