public void saveClusterAggregateRecords()

in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java [1462:1545]


  public void saveClusterAggregateRecords(Map<TimelineClusterMetric, MetricClusterAggregate> records)
      throws SQLException {

    if (records == null || records.isEmpty()) {
      LOG.debug("Empty aggregate records.");
      return;
    }

    long start = System.currentTimeMillis();
    String sqlStr = String.format(UPSERT_CLUSTER_AGGREGATE_SQL, METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
    Connection conn = getConnection();
    PreparedStatement stmt = null;
    try {
      stmt = conn.prepareStatement(sqlStr);
      int rowCount = 0;

      for (Map.Entry<TimelineClusterMetric, MetricClusterAggregate>
        aggregateEntry : records.entrySet()) {
        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
        MetricClusterAggregate aggregate = aggregateEntry.getValue();

        if (LOG.isTraceEnabled()) {
          LOG.trace("clusterMetric = " + clusterMetric + ", " +
            "aggregate = " + aggregate);
        }

        rowCount++;
        byte[] uuid =  metadataManagerInstance.getUuid(clusterMetric, true);
        if (uuid == null) {
          LOG.error("Error computing UUID for metric. Cannot write metrics : " + clusterMetric.toString());
          continue;
        }
        stmt.clearParameters();
        stmt.setBytes(1, uuid);
        stmt.setLong(2, clusterMetric.getTimestamp());
        stmt.setDouble(3, aggregate.getSum());
        stmt.setInt(4, aggregate.getNumberOfHosts());
        stmt.setDouble(5, aggregate.getMax());
        stmt.setDouble(6, aggregate.getMin());

        try {
          stmt.executeUpdate();
        } catch (SQLException sql) {
          // we have no way to verify it works!!!
          LOG.error(sql);
        }

        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
          conn.commit();
          rowCount = 0;
        }
      }

      conn.commit();

    } finally {
      if (stmt != null) {
        try {
          stmt.close();
        } catch (SQLException e) {
          // Ignore
        }
      }
      if (conn != null) {
        try {
          conn.close();
        } catch (SQLException sql) {
          // Ignore
        }
      }
    }
    long end = System.currentTimeMillis();
    if ((end - start) > 60000l) {
      LOG.info("Time to save: " + (end - start) + ", " +
        "thread = " + Thread.currentThread().getName());
    }
    if (aggregatorSink != null) {
      try {
        aggregatorSink.saveClusterAggregateRecords(records);
      } catch (Exception e) {
        LOG.warn("Error writing cluster aggregate records metrics to external sink. ", e);
      }
    }
  }