in ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java [514:616]
protected void initMetricSchema() {
Connection conn = null;
Statement stmt = null;
PreparedStatement pStmt = null;
TimelineMetricSplitPointComputer splitPointComputer = new TimelineMetricSplitPointComputer(
metricsConf, hbaseConf, metadataManagerInstance);
splitPointComputer.computeSplitPoints();
String encoding = metricsConf.get(HBASE_ENCODING_SCHEME, DEFAULT_ENCODING);
String compression = metricsConf.get(HBASE_COMPRESSION_SCHEME, DEFAULT_TABLE_COMPRESSION);
try {
LOG.info("Initializing metrics schema...");
conn = getConnectionRetryingOnException();
stmt = conn.createStatement();
// Container Metrics
stmt.executeUpdate( String.format(CREATE_CONTAINER_METRICS_TABLE_SQL,
encoding, tableTTL.get(CONTAINER_METRICS_TABLE_NAME), compression));
// Host level
String precisionSql = String.format(CREATE_METRICS_TABLE_SQL,
encoding, tableTTL.get(METRICS_RECORD_TABLE_NAME), compression);
pStmt = prepareCreateMetricsTableStatement(conn, precisionSql, splitPointComputer.getPrecisionSplitPoints());
pStmt.executeUpdate();
String hostMinuteAggregrateSql = String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
METRICS_AGGREGATE_MINUTE_TABLE_NAME, encoding,
tableTTL.get(METRICS_AGGREGATE_MINUTE_TABLE_NAME),
compression);
pStmt = prepareCreateMetricsTableStatement(conn, hostMinuteAggregrateSql, splitPointComputer.getHostAggregateSplitPoints());
pStmt.executeUpdate();
stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
METRICS_AGGREGATE_HOURLY_TABLE_NAME, encoding,
tableTTL.get(METRICS_AGGREGATE_HOURLY_TABLE_NAME),
compression));
stmt.executeUpdate(String.format(CREATE_METRICS_AGGREGATE_TABLE_SQL,
METRICS_AGGREGATE_DAILY_TABLE_NAME, encoding,
tableTTL.get(METRICS_AGGREGATE_DAILY_TABLE_NAME),
compression));
// Cluster level
String aggregateSql = String.format(CREATE_METRICS_CLUSTER_AGGREGATE_TABLE_SQL,
METRICS_CLUSTER_AGGREGATE_TABLE_NAME, encoding,
tableTTL.get(METRICS_CLUSTER_AGGREGATE_TABLE_NAME),
compression);
pStmt = prepareCreateMetricsTableStatement(conn, aggregateSql, splitPointComputer.getClusterAggregateSplitPoints());
pStmt.executeUpdate();
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME, encoding,
tableTTL.get(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME),
compression));
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME, encoding,
tableTTL.get(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME),
compression));
stmt.executeUpdate(String.format(CREATE_METRICS_CLUSTER_AGGREGATE_GROUPED_TABLE_SQL,
METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME, encoding,
tableTTL.get(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME),
compression));
// Metrics Transient Table
String transientMetricPatterns = metricsConf.get(TRANSIENT_METRIC_PATTERNS, StringUtils.EMPTY);
if (StringUtils.isNotEmpty(transientMetricPatterns)) {
String transientMetricsTableSql = String.format(CREATE_TRANSIENT_METRICS_TABLE_SQL,
encoding, tableTTL.get(METRIC_TRANSIENT_TABLE_NAME), compression);
int row = stmt.executeUpdate(transientMetricsTableSql);
}
conn.commit();
LOG.info("Metrics schema initialized.");
} catch (SQLException | InterruptedException sql) {
LOG.error("Error creating Metrics Schema in HBase using Phoenix.", sql);
throw new MetricsSystemInitializationException(
"Error creating Metrics Schema in HBase using Phoenix.", sql);
} finally {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {
// Ignore
}
}
if (pStmt != null) {
try {
pStmt.close();
} catch (Exception e) {
// Ignore
}
}
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
// Ignore
}
}
}
}