public void commitMetrics()

in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java [322:414]


  public void commitMetrics(Collection<TimelineMetrics> timelineMetricsCollection) {
    LOG.debug("Committing metrics to store");
    Connection conn = null;
    PreparedStatement metricRecordStmt = null;
    List<TimelineMetric> transientMetrics = new ArrayList<>();
    int rowCount = 0;

    try {
      conn = getConnection();
      metricRecordStmt = conn.prepareStatement(String.format(
              UPSERT_METRICS_SQL, METRICS_RECORD_TABLE_NAME));
      for (TimelineMetrics timelineMetrics : timelineMetricsCollection) {
        for (TimelineMetric metric : timelineMetrics.getMetrics()) {

          if (metadataManagerInstance.isTransientMetric(metric.getMetricName(), metric.getAppId())) {
            transientMetrics.add(metric);
            continue;
          }
          metricRecordStmt.clearParameters();

          if (LOG.isTraceEnabled()) {
            LOG.trace("host: " + metric.getHostName() + ", " +
                    "metricName = " + metric.getMetricName() + ", " +
                    "values: " + metric.getMetricValues());
          }
          double[] aggregates = AggregatorUtils.calculateAggregates(
                  metric.getMetricValues());

          if (aggregates[3] != 0.0) {
            rowCount++;
            byte[] uuid = metadataManagerInstance.getUuid(metric, true);
            if (uuid == null) {
              LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
              continue;
            }
            metricRecordStmt.setBytes(1, uuid);
            metricRecordStmt.setLong(2, metric.getStartTime());
            metricRecordStmt.setDouble(3, aggregates[0]);
            metricRecordStmt.setDouble(4, aggregates[1]);
            metricRecordStmt.setDouble(5, aggregates[2]);
            metricRecordStmt.setLong(6, (long) aggregates[3]);
            String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
            metricRecordStmt.setString(7, json);

            try {
              int rows = metricRecordStmt.executeUpdate();
            } catch (SQLException | NumberFormatException ex) {
              LOG.warn("Failed on insert records to store : " + ex.getMessage());
              LOG.warn("Metric that cannot be stored : [" + metric.getMetricName() + "," + metric.getAppId() + "]" +
                metric.getMetricValues().toString());
              continue;
            }

            if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
              conn.commit();
              rowCount = 0;
            }

          } else {
            LOG.debug("Discarding empty metric record for : [" + metric.getMetricName() + "," +
              metric.getAppId() + "," +
              metric.getHostName() + "," +
              metric.getInstanceId() + "]");
          }

        }
      }
      if (CollectionUtils.isNotEmpty(transientMetrics)) {
        commitTransientMetrics(conn, transientMetrics);
      }

      // commit() blocked if HBase unavailable
      conn.commit();
    } catch (Exception exception){
      exception.printStackTrace();
    }
    finally {
      if (metricRecordStmt != null) {
        try {
          metricRecordStmt.close();
        } catch (SQLException e) {
          // Ignore
        }
      }
      if (conn != null) {
        try {
          conn.close();
        } catch (SQLException sql) {
          // Ignore
        }
      }
    }
  }