in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java [1369:1455]
public void saveHostAggregateRecords(Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
String phoenixTableName) throws SQLException {
if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
LOG.debug("Empty aggregate records.");
return;
}
Connection conn = getConnection();
PreparedStatement stmt = null;
long start = System.currentTimeMillis();
int rowCount = 0;
try {
stmt = conn.prepareStatement(
String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));
for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
hostAggregateMap.entrySet()) {
TimelineMetric metric = metricAggregate.getKey();
MetricHostAggregate hostAggregate = metricAggregate.getValue();
byte[] uuid = metadataManagerInstance.getUuid(metric, true);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write aggregate metric : " + metric.toString());
continue;
}
rowCount++;
stmt.clearParameters();
stmt.setBytes(1, uuid);
stmt.setLong(2, metric.getStartTime());
stmt.setDouble(3, hostAggregate.getSum());
stmt.setDouble(4, hostAggregate.getMax());
stmt.setDouble(5, hostAggregate.getMin());
stmt.setDouble(6, hostAggregate.getNumberOfSamples());
try {
stmt.executeUpdate();
} catch (SQLException sql) {
LOG.error(sql);
}
if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
conn.commit();
rowCount = 0;
}
}
conn.commit();
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
// Ignore
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException sql) {
// Ignore
}
}
}
long end = System.currentTimeMillis();
if ((end - start) > 60000l) {
LOG.info("Time to save map: " + (end - start) + ", " +
"thread = " + Thread.currentThread().getClass());
}
if (aggregatorSink != null) {
try {
aggregatorSink.saveHostAggregateRecords(hostAggregateMap,
getTablePrecision(phoenixTableName));
} catch (Exception e) {
LOG.warn(
"Error writing host aggregate records metrics to external sink. "
+ e);
}
}
}