in ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java [81:190]
public void init(SubsetConfiguration conf) {
this.conf = conf;
LOG.info("Initializing Timeline metrics sink.");
// Take the hostname from the DNS class.
if (conf.getString("slave.host.name") != null) {
hostName = conf.getString("slave.host.name");
} else {
try {
hostName = DNS.getDefaultHost(
conf.getString("dfs.datanode.dns.interface", "default"),
conf.getString("dfs.datanode.dns.nameserver", "default"));
} catch (UnknownHostException uhe) {
LOG.error(uhe);
hostName = "UNKNOWN.example.com";
}
}
serviceName = getServiceName(conf);
instanceId = conf.getString(INSTANCE_ID_PROPERTY, null);
setInstanceId = conf.getBoolean(SET_INSTANCE_ID_PROPERTY, false);
LOG.info("Identified hostname = " + hostName + ", serviceName = " + serviceName);
// Initialize the collector write strategy
super.init();
// Load collector configs
protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
String collectorHostStr = conf.getString(COLLECTOR_HOSTS_PROPERTY);
String[] collectorHostArr = null;
if(collectorHostStr !=null) {
collectorHostArr = collectorHostStr.split(",");
}
collectorHosts = parseHostsStringArrayIntoCollection(collectorHostArr);
port = conf.getString(COLLECTOR_PORT, "6188");
hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
hostInMemoryAggregationProtocol = conf.getString(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
if (collectorHosts.isEmpty()) {
LOG.error("No Metric collector configured.");
} else {
if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = conf.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
}
String preferredCollectorHost = findPreferredCollectHost();
collectorUri = constructTimelineMetricUri(protocol, preferredCollectorHost, port);
containerMetricsUri = constructContainerMetricUri(protocol, preferredCollectorHost, port);
if (StringUtils.isNotEmpty(preferredCollectorHost)) {
LOG.info("Collector Uri: " + collectorUri);
LOG.info("Container Metrics Uri: " + containerMetricsUri);
} else {
LOG.info("No suitable collector found.");
}
}
timeoutSeconds = conf.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS);
int maxRowCacheSize = conf.getInt(MAX_METRIC_ROW_CACHE_SIZE,
TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT);
int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL,
TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS); // ~ 1 min
// Skip aggregation of counter values by calculating derivative
metricsCache = new TimelineMetricsCache(maxRowCacheSize,
metricsSendInterval, conf.getBoolean(SKIP_COUNTER_TRANSFROMATION, true));
conf.setListDelimiterHandler(new DefaultListDelimiterHandler(','));
Iterator<String> it = (Iterator<String>) conf.getKeys();
while (it.hasNext()) {
String propertyName = it.next();
if (propertyName != null) {
if (propertyName.startsWith(TAGS_FOR_PREFIX_PROPERTY_PREFIX)) {
String contextName = propertyName.substring(TAGS_FOR_PREFIX_PROPERTY_PREFIX.length());
String[] tags = conf.getStringArray(propertyName);
boolean useAllTags = false;
Set<String> set = null;
if (tags.length > 0) {
set = new HashSet<String>();
for (String tag : tags) {
tag = tag.trim();
useAllTags |= tag.equals("*");
if (tag.length() > 0) {
set.add(tag);
}
}
if (useAllTags) {
set = null;
}
}
useTagsMap.put(contextName, set);
}
// Customized RPC ports
if (propertyName.startsWith(RPC_METRIC_PREFIX)) {
// metric.rpc.client.port
int beginIdx = RPC_METRIC_PREFIX.length() + 1;
String suffixStr = propertyName.substring(beginIdx); // client.port
String configPrefix = suffixStr.substring(0, suffixStr.indexOf(".")); // client
rpcPortSuffixes.put(conf.getString(propertyName).trim(), configPrefix.trim());
}
}
}
if (!rpcPortSuffixes.isEmpty()) {
LOG.info("RPC port properties configured: " + rpcPortSuffixes);
}
}