in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java [322:414]
public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) {
LOG.debug("Committing metrics to store");
Connection conn = null;
PreparedStatement metricRecordStmt = null;
List<TimelineMetric> transientMetrics = new ArrayList<>();
int rowCount = 0;
try {
conn = getConnection();
metricRecordStmt = conn.prepareStatement(String.format(
UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
for (TimelineMetrics timelineMetrics : timelineMetricsCollection) {
for (TimelineMetric metric : timelineMetrics.getMetrics()) {
if (metadataManagerInstance.isTransientMetric(metric.getMetricName(), metric.getAppId())) {
transientMetrics.add(metric);
continue;
}
metricRecordStmt.clearParameters();
if (LOG.isTraceEnabled()) {
LOG.trace("host: " + metric.getHostName() + ", " +
"metricName = " + metric.getMetricName() + ", " +
"values: " + metric.getMetricValues());
}
double[] aggregates = AggregatorUtils.calculateAggregates(
metric.getMetricValues());
if (aggregates[3] != 0.0) {
rowCount++;
byte[] uuid = metadataManagerInstance.getUuid(metric, true);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
continue;
}
metricRecordStmt.setBytes(1, uuid);
metricRecordStmt.setLong(2, metric.getStartTime());
metricRecordStmt.setDouble(3, aggregates[0]);
metricRecordStmt.setDouble(4, aggregates[1]);
metricRecordStmt.setDouble(5, aggregates[2]);
metricRecordStmt.setLong(6, (long) aggregates[3]);
String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
metricRecordStmt.setString(7, json);
try {
int rows = metricRecordStmt.executeUpdate();
} catch (SQLException | NumberFormatException ex) {
LOG.warn("Failed on insert records to store : " + ex.getMessage());
LOG.warn("Metric that cannot be stored : [" + metric.getMetricName() + "," + metric.getAppId() + "]" +
metric.getMetricValues().toString());
continue;
}
if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
conn.commit();
rowCount = 0;
}
} else {
LOG.debug("Discarding empty metric record for : [" + metric.getMetricName() + "," +
metric.getAppId() + "," +
metric.getHostName() + "," +
metric.getInstanceId() + "]");
}
}
}
if (CollectionUtils.isNotEmpty(transientMetrics)) {
commitTransientMetrics(conn, transientMetrics);
}
// commit() blocked if HBase unavailable
conn.commit();
} catch (Exception exception){
exception.printStackTrace();
}
finally {
if (metricRecordStmt != null) {
try {
metricRecordStmt.close();
} catch (SQLException e) {
// Ignore
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException sql) {
// Ignore
}
}
}
}