in ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java [160:234]
public void init(VerifiableProperties props) {
synchronized (lock) {
if (!initialized) {
LOG.info("Initializing Kafka Timeline Metrics Sink");
try {
hostname = InetAddress.getLocalHost().getHostName();
//If not FQDN , call DNS
if ((hostname == null) || (!hostname.contains("."))) {
hostname = InetAddress.getLocalHost().getCanonicalHostName();
}
} catch (UnknownHostException e) {
LOG.error("Could not identify hostname.");
throw new RuntimeException("Could not identify hostname.", e);
}
// Initialize the collector write strategy
super.init();
KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
timeoutSeconds = props.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS);
int metricsSendInterval = props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS);
int maxRowCacheSize = props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, MAX_RECS_PER_NAME_DEFAULT);
zookeeperQuorum = props.containsKey(COLLECTOR_ZOOKEEPER_QUORUM) ?
props.getString(COLLECTOR_ZOOKEEPER_QUORUM) : props.getString("zookeeper.connect");
metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
collectorHosts = parseHostsStringIntoCollection(props.getString(TIMELINE_HOSTS_PROPERTY, TIMELINE_DEFAULT_HOST));
metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY, null);
setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY, false);
hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
hostInMemoryAggregationProtocol = props.getString(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
if (metricCollectorProtocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
}
// Exclusion policy
String excludedMetricsStr = props.getString(EXCLUDED_METRICS_PROPERTY, "");
if (!StringUtils.isEmpty(excludedMetricsStr.trim())) {
excludedMetricsPrefixes = excludedMetricsStr.trim().split(",");
}
// Inclusion override
String includedMetricsStr = props.getString(INCLUDED_METRICS_PROPERTY, "");
if (!StringUtils.isEmpty(includedMetricsStr.trim())) {
includedMetricsPrefixes = includedMetricsStr.trim().split(",");
}
// Inclusion override
String includedMetricsRegexStr = props.getString(INCLUDED_METRICS_REGEX_PROPERTY, "");
if (!StringUtils.isEmpty(includedMetricsRegexStr.trim())) {
LOG.info("Including metrics which match the following regex patterns : " + includedMetricsRegexStr);
includedMetricsRegex = includedMetricsRegexStr.trim().split(",");
}
initializeReporter();
if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
startReporter(metricsConfig.pollingIntervalSecs());
}
if (LOG.isDebugEnabled()) {
LOG.debug("MetricsSendInterval = " + metricsSendInterval);
LOG.debug("MaxRowCacheSize = " + maxRowCacheSize);
LOG.debug("Excluded metrics prefixes = " + excludedMetricsStr);
LOG.debug("Included metrics prefixes = " + includedMetricsStr);
}
}
}
}