in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregator.java [257:319]
public boolean doWork(long startTime, long endTime) {
LOG.info("Start aggregation cycle @ " + new Date() + ", " +
"startTime = " + new Date(startTime) + ", endTime = " + new Date(endTime));
boolean success = true;
Condition condition = prepareMetricQueryCondition(startTime, endTime);
Connection conn = null;
PreparedStatement stmt = null;
ResultSet rs = null;
try {
conn = hBaseAccessor.getConnection();
// FLUME 2. aggregate and ignore the instance
stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
LOG.debug("Query issued @: " + new Date());
if (condition.doUpdate()) {
conn.setAutoCommit(true);
int rows = stmt.executeUpdate();
conn.commit();
conn.setAutoCommit(false);
LOG.info(rows + " row(s) updated in aggregation.");
//TODO : Fix downsampling after UUID change.
//downsample(conn, startTime, endTime);
} else {
rs = stmt.executeQuery();
}
LOG.debug("Query returned @: " + new Date());
aggregate(rs, startTime, endTime);
} catch (Exception e) {
LOG.error("Exception during aggregating metrics.", e);
success = false;
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
// Ignore
}
}
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
// Ignore
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException sql) {
// Ignore
}
}
}
LOG.info("End aggregation cycle @ " + new Date());
return success;
}