public void saveHostAggregateRecords()

in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java [1369:1455]


  public void saveHostAggregateRecords(Map<TimelineMetric, MetricHostAggregate> hostAggregateMap,
                                       String phoenixTableName) throws SQLException {

    if (hostAggregateMap == null || hostAggregateMap.isEmpty()) {
      LOG.debug("Empty aggregate records.");
      return;
    }

    Connection conn = getConnection();
    PreparedStatement stmt = null;

    long start = System.currentTimeMillis();
    int rowCount = 0;

    try {
      stmt = conn.prepareStatement(
        String.format(UPSERT_AGGREGATE_RECORD_SQL, phoenixTableName));

      for (Map.Entry<TimelineMetric, MetricHostAggregate> metricAggregate :
        hostAggregateMap.entrySet()) {

        TimelineMetric metric = metricAggregate.getKey();
        MetricHostAggregate hostAggregate = metricAggregate.getValue();

        byte[] uuid = metadataManagerInstance.getUuid(metric, true);
        if (uuid == null) {
          LOG.error("Error computing UUID for metric. Cannot write aggregate metric : " + metric.toString());
          continue;
        }
        rowCount++;
        stmt.clearParameters();
        stmt.setBytes(1, uuid);
        stmt.setLong(2, metric.getStartTime());
        stmt.setDouble(3, hostAggregate.getSum());
        stmt.setDouble(4, hostAggregate.getMax());
        stmt.setDouble(5, hostAggregate.getMin());
        stmt.setDouble(6, hostAggregate.getNumberOfSamples());

        try {
          stmt.executeUpdate();
        } catch (SQLException sql) {
          LOG.error(sql);
        }

        if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
          conn.commit();
          rowCount = 0;
        }

      }

      conn.commit();

    } finally {
      if (stmt != null) {
        try {
          stmt.close();
        } catch (SQLException e) {
          // Ignore
        }
      }
      if (conn != null) {
        try {
          conn.close();
        } catch (SQLException sql) {
          // Ignore
        }
      }
    }

    long end = System.currentTimeMillis();

    if ((end - start) > 60000l) {
      LOG.info("Time to save map: " + (end - start) + ", " +
        "thread = " + Thread.currentThread().getClass());
    }
    if (aggregatorSink != null) {
      try {
        aggregatorSink.saveHostAggregateRecords(hostAggregateMap,
            getTablePrecision(phoenixTableName));
      } catch (Exception e) {
        LOG.warn(
            "Error writing host aggregate records metrics to external sink. "
                + e);
      }
    }
  }