in src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/TableAnalyzeJob.java [77:154]
public void analyzeTable(TableDesc tableDesc, String project, int rowCount, SparkSession ss) {
long start = System.currentTimeMillis();
Row[] row = new TableAnalyzeExec(tableDesc, project, rowCount, ss, jobId).analyzeTable();
logger.info("sampling rows from table {} takes {}s", tableDesc.getIdentity(),
(System.currentTimeMillis() - start) / 1000);
val tableMetadataManager = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
var tableExtDesc = tableMetadataManager.getOrCreateTableExt(tableDesc);
final long count_star = Long.parseLong(row[0].get(0).toString());
final List<TableExtDesc.ColumnStats> columnStatsList = new ArrayList<>(tableDesc.getColumnCount());
for (int colIdx = 0; colIdx < tableDesc.getColumnCount(); colIdx++) {
final ColumnDesc columnDesc = tableDesc.getColumns()[colIdx];
if (columnDesc.isComputedColumn()) {
continue;
}
TableExtDesc.ColumnStats colStats = tableExtDesc.getColumnStatsByName(columnDesc.getName());
if (colStats == null) {
colStats = new TableExtDesc.ColumnStats();
colStats.setColumnName(columnDesc.getName());
}
int metricLen = TABLE_STATS_METRICS.size();
for (int i = 0; i < metricLen; i++) {
String value = row[0].get(i + 1 + metricLen * colIdx) == null ? null
: row[0].get(i + 1 + metricLen * colIdx).toString();
switch (TABLE_STATS_METRICS.get(i)) {
case "COUNT":
colStats.setNullCount(count_star - Long.parseLong(value));
break;
case "MAX":
colStats.setMaxValue(value);
break;
case "MIN":
colStats.setMinValue(value);
break;
case "COUNT_DISTINCT":
colStats.setCardinality(Long.parseLong(value));
break;
default:
throw new IllegalArgumentException(
"not support this metric" + TABLE_STATS_METRICS.get(i) + "in table Sampling");
}
}
columnStatsList.add(colStats);
}
List<String[]> sampleData = Lists.newArrayList();
IntStream.range(1, row.length).forEach(i -> {
String[] data = new String[row[i].length()];
IntStream.range(0, row[i].length()).forEach(j -> {
final Object obj = row[i].get(j);
if (obj == null) {
data[j] = null;
} else if (obj instanceof Timestamp) {
data[j] = DateFormat.castTimestampToString(((Timestamp) obj).getTime());
} else {
data[j] = obj.toString();
}
});
sampleData.add(data);
});
UnitOfWork.doInTransactionWithRetry(() -> {
val tableMetadataManagerForUpdate = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(),
project);
var tableExt = tableMetadataManagerForUpdate.getOrCreateTableExt(tableDesc);
tableExt = tableMetadataManagerForUpdate.copyForWrite(tableExt);
tableExt.setTotalRows(count_star);
tableExt.setColumnStats(columnStatsList);
tableExt.setSampleRows(sampleData);
tableExt.setJodID(jobId);
tableMetadataManagerForUpdate.saveTableExt(tableExt);
return null;
}, project);
logger.info("Table {} analysis finished, update table ext desc done.", tableDesc.getName());
}