in ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java [275:381]
public void putMetrics(MetricsRecord record) {
try {
String recordName = record.name();
String contextName = record.context();
StringBuilder sb = new StringBuilder();
boolean skipAggregation = false;
// Transform ipc.8020 -> ipc.client, ipc.8040 -> ipc.datanode, etc.
if (contextName.startsWith("ipc.")) {
String portNumber = contextName.replaceFirst("ipc.", "");
if (rpcPortSuffixes.containsKey(portNumber)) {
contextName = "ipc." + rpcPortSuffixes.get(portNumber);
}
}
sb.append(contextName);
sb.append('.');
if (record.tags() != null) {
for (MetricsTag tag : record.tags()) {
if (StringUtils.isNotEmpty(tag.name()) && tag.name().equals("skipAggregation")) {
skipAggregation = String.valueOf(true).equals(tag.value());
}
// Similar to GangliaContext adding processName to distinguish jvm
// metrics for co-hosted daemons. We only do this for HBase since the
// appId is shared for Master and RS.
if (contextName.equals("jvm") && tag.info().name().equalsIgnoreCase("processName") &&
(tag.value().equals("RegionServer") || tag.value().equals("Master"))) {
sb.append(tag.value());
sb.append('.');
}
}
}
sb.append(recordName);
appendPrefix(record, sb);
sb.append('.');
// Add port tag for rpc metrics to distinguish rpc calls based on port
if (!rpcPortSuffixes.isEmpty() && contextName.contains("rpc")) {
if (record.tags() != null) {
for (MetricsTag tag : record.tags()) {
if (tag.info().name().equalsIgnoreCase("port") &&
rpcPortSuffixes.keySet().contains(tag.value())) {
sb.append(rpcPortSuffixes.get(tag.value()));
sb.append('.');
}
}
}
}
if (record.context().equals("container")) {
emitContainerMetrics(record);
return;
}
int sbBaseLen = sb.length();
List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
HashMap<String, String> metadata = null;
if (skipAggregation) {
metadata = new HashMap<>();
metadata.put("skipAggregation", "true");
}
long startTime = record.timestamp();
for (AbstractMetric metric : record.metrics()) {
sb.append(metric.name());
String name = sb.toString();
Number value = metric.value();
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(name);
timelineMetric.setHostName(hostName);
timelineMetric.setAppId(serviceName);
if (setInstanceId) {
timelineMetric.setInstanceId(instanceId);
}
timelineMetric.setStartTime(startTime);
timelineMetric.setType(metric.type() != null ? metric.type().name() : null);
timelineMetric.getMetricValues().put(startTime, value.doubleValue());
if (metadata != null) {
timelineMetric.setMetadata(metadata);
}
// Put intermediate values into the cache until it is time to send
boolean isCounter = MetricType.COUNTER == metric.type();
metricsCache.putTimelineMetric(timelineMetric, isCounter);
// Retrieve all values from cache if it is time to send
TimelineMetric cachedMetric = metricsCache.getTimelineMetric(name);
if (cachedMetric != null) {
metricList.add(cachedMetric);
}
sb.setLength(sbBaseLen);
}
TimelineMetrics timelineMetrics = new TimelineMetrics();
timelineMetrics.setMetrics(metricList);
if (!metricList.isEmpty()) {
emitMetrics(timelineMetrics);
}
} catch (UnableToConnectException uce) {
LOG.warn("Unable to send metrics to collector by address:" + uce.getConnectUrl());
}
}