public void saveClusterAggregateRecordsSecond()

in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java [1553:1638]


  public void saveClusterAggregateRecordsSecond(Map<TimelineClusterMetric, MetricHostAggregate> records,
                                                String tableName) throws SQLException {
    if (records == null || records.isEmpty()) {
      LOG.debug("Empty aggregate records.");
      return;
    }

    long start = System.currentTimeMillis();

    Connection conn = getConnection();
    PreparedStatement stmt = null;
    try {
      stmt = conn.prepareStatement(String.format(UPSERT_CLUSTER_AGGREGATE_TIME_SQL, tableName));
      int rowCount = 0;

      for (Map.Entry<TimelineClusterMetric, MetricHostAggregate> aggregateEntry : records.entrySet()) {
        TimelineClusterMetric clusterMetric = aggregateEntry.getKey();
        MetricHostAggregate aggregate = aggregateEntry.getValue();

        if (LOG.isTraceEnabled()) {
          LOG.trace("clusterMetric = " + clusterMetric + ", " +
            "aggregate = " + aggregate);
        }

        byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, true);
        if (uuid == null) {
          LOG.error("Error computing UUID for metric. Cannot write metric : " + clusterMetric.toString());
          continue;
        }

        rowCount++;
        stmt.clearParameters();
        stmt.setBytes(1, uuid);
        stmt.setLong(2, clusterMetric.getTimestamp());
        stmt.setDouble(3, aggregate.getSum());
        stmt.setLong(4, aggregate.getNumberOfSamples());
        stmt.setDouble(5, aggregate.getMax());
        stmt.setDouble(6, aggregate.getMin());

        try {
          stmt.executeUpdate();
        } catch (SQLException sql) {
          // we have no way to verify it works!!!
          LOG.error(sql);
        }

        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
          conn.commit();
          rowCount = 0;
        }
      }

      conn.commit();

    } finally {
      if (stmt != null) {
        try {
          stmt.close();
        } catch (SQLException e) {
          // Ignore
        }
      }
      if (conn != null) {
        try {
          conn.close();
        } catch (SQLException sql) {
          // Ignore
        }
      }
    }
    long end = System.currentTimeMillis();
    if ((end - start) > 60000l) {
      LOG.info("Time to save: " + (end - start) + ", " +
        "thread = " + Thread.currentThread().getName());
    }
    if (aggregatorSink != null) {
      try {
        aggregatorSink.saveClusterTimeAggregateRecords(records,
            getTablePrecision(tableName));
      } catch (Exception e) {
        LOG.warn(
            "Error writing cluster time aggregate records metrics to external sink. "
                + e);
      }
    }
  }