in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java [1553:1638]
public void saveClusterAggregateRecordsSecond(Map<TimelineClusterMetric, MetricHostAggregate> records,
String tableName) throws SQLException {
if (records == null || records.isEmpty()) {
LOG.debug("Empty aggregate records.");
return;
}
long start = System.currentTimeMillis();
Connection conn = getConnection();
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(String.format(UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
int rowCount = 0;
for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> aggregateEntry : records.entrySet()) {
TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
MetricHostAggregate aggregate = aggregateEntry.getValue();
if (LOG.isTraceEnabled()) {
LOG.trace("clusterMetric = " + clusterMetric + ", " +
"aggregate = " + aggregate);
}
byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, true);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write metric : " + clusterMetric.toString());
continue;
}
rowCount++;
stmt.clearParameters();
stmt.setBytes(1, uuid);
stmt.setLong(2, clusterMetric.getTimestamp());
stmt.setDouble(3, aggregate.getSum());
stmt.setLong(4, aggregate.getNumberOfSamples());
stmt.setDouble(5, aggregate.getMax());
stmt.setDouble(6, aggregate.getMin());
try {
stmt.executeUpdate();
} catch (SQLException sql) {
// we have no way to verify it works!!!
LOG.error(sql);
}
if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
conn.commit();
rowCount = 0;
}
}
conn.commit();
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
// Ignore
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException sql) {
// Ignore
}
}
}
long end = System.currentTimeMillis();
if ((end - start) > 60000l) {
LOG.info("Time to save: " + (end - start) + ", " +
"thread = " + Thread.currentThread().getName());
}
if (aggregatorSink != null) {
try {
aggregatorSink.saveClusterTimeAggregateRecords(records,
getTablePrecision(tableName));
} catch (Exception e) {
LOG.warn(
"Error writing cluster time aggregate records metrics to external sink. "
+ e);
}
}
}