protected void initMetricSchema()

in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java [514:616]


  protected void initMetricSchema() {
    Connection conn = null;
    Statement stmt = null;
    PreparedStatement pStmt = null;
    TimelineMetricSplitPointComputer splitPointComputer = new TimelineMetricSplitPointComputer(
      metricsConf, hbaseConf, metadataManagerInstance);
    splitPointComputer.computeSplitPoints();

    String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
    String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);


    try {
      LOG.info("Initializing metrics schema...");
      conn = getConnectionRetryingOnException();
      stmt = conn.createStatement();

      // Container Metrics
      stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
        encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));

      // Host level
      String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
        encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
      pStmt = prepareCreateMetricsTableStatement(conn, precisionSql, splitPointComputer.getPrecisionSplitPoints());
      pStmt.executeUpdate();

      String hostMinuteAggregrateSql = String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
        METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding,
        tableTTL.get(METRICS_AGGREGATE_MINUTE_TABLE_NAME),
        compression);
      pStmt = prepareCreateMetricsTableStatement(conn, hostMinuteAggregrateSql, splitPointComputer.getHostAggregateSplitPoints());
      pStmt.executeUpdate();

      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
        METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding,
        tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME),
        compression));
      stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
        METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding,
        tableTTL.get(METRICS_AGGREGATE_DAILY_TABLE_NAME),
        compression));

      // Cluster level
      String aggregateSql = String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
        METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding,
        tableTTL.get(METRICS_CLUSTER_AGGREGATE_TABLE_NAME),
        compression);
      pStmt = prepareCreateMetricsTableStatement(conn, aggregateSql, splitPointComputer.getClusterAggregateSplitPoints());
      pStmt.executeUpdate();

      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
        METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding,
        tableTTL.get(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME),
        compression));
      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
        METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding,
        tableTTL.get(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME),
        compression));
      stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
        METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding,
        tableTTL.get(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME),
        compression));

      // Metrics Transient Table
      String transientMetricPatterns = metricsConf.get(TRANSIENT_METRIC_PATTERNS, StringUtils.EMPTY);
      if (StringUtils.isNotEmpty(transientMetricPatterns)) {
        String transientMetricsTableSql = String.format(CREATE_TRANSIENT_METRICS_TABLE_SQL,
          encoding, tableTTL.get(METRIC_TRANSIENT_TABLE_NAME), compression);
        int row = stmt.executeUpdate(transientMetricsTableSql);
      }

      conn.commit();

      LOG.info("Metrics schema initialized.");
    } catch (SQLException | InterruptedException sql) {
      LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql);
      throw new MetricsSystemInitializationException(
        "Error creating Metrics Schema in HBase using Phoenix.", sql);
    } finally {
      if (stmt != null) {
        try {
          stmt.close();
        } catch (SQLException e) {
          // Ignore
        }
      }
      if (pStmt != null) {
        try {
          pStmt.close();
        } catch (Exception e) {
          // Ignore
        }
      }
      if (conn != null) {
        try {
          conn.close();
        } catch (SQLException e) {
          // Ignore
        }
      }
    }
  }