in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [787:887]
private RelNode genTableLogicalPlan(String tableAlias, HiveParserQB qb)
throws SemanticException {
HiveParserRowResolver rowResolver = new HiveParserRowResolver();
try {
// 1. If the table has a split sample, and it isn't TABLESAMPLE (n ROWS), throw
// exception
// 2. if the table has a bucket sample, throw exception
SplitSample splitSample = semanticAnalyzer.getNameToSplitSampleMap().get(tableAlias);
if ((splitSample != null
&& (splitSample.getPercent() != null
|| splitSample.getTotalLength() != null))
|| qb.getParseInfo().needTableSample(tableAlias)) {
throw new UnsupportedOperationException("Only TABLESAMPLE (n ROWS) is supported.");
}
// 2. Get Table Metadata
if (qb.getValuesTableToData().containsKey(tableAlias)) {
// a temp table has been created for VALUES, we need to convert it to LogicalValues
Tuple2<ResolvedCatalogTable, List<List<String>>> tableValueTuple =
qb.getValuesTableToData().get(tableAlias);
RelNode values =
genValues(
tableAlias,
tableValueTuple.f0,
rowResolver,
cluster,
tableValueTuple.f1);
relToRowResolver.put(values, rowResolver);
relToHiveColNameCalcitePosMap.put(values, buildHiveToCalciteColumnMap(rowResolver));
return values;
} else {
// 3. Get Table Logical Schema (Row Type)
// NOTE: Table logical schema = Non Partition Cols + Partition Cols + Virtual Cols
Tuple2<String, CatalogTable> nameAndTableTuple =
qb.getMetaData().getSrcForAlias(tableAlias);
String tableName = nameAndTableTuple.f0;
ResolvedCatalogTable resolvedCatalogTable =
(ResolvedCatalogTable) nameAndTableTuple.f1;
ResolvedSchema resolvedSchema = resolvedCatalogTable.getResolvedSchema();
String[] fieldNames = resolvedSchema.getColumnNames().toArray(new String[0]);
ColumnInfo colInfo;
// 3.1 Add Column info
for (String fieldName : fieldNames) {
Optional<DataType> dataType =
resolvedSchema.getColumn(fieldName).map(Column::getDataType);
TypeInfo hiveType =
HiveTypeUtil.toHiveTypeInfo(
dataType.orElseThrow(
() ->
new SemanticException(
String.format(
"Can't get data type for column %s of table %s.",
fieldName, tableName))),
false);
colInfo = new ColumnInfo(fieldName, hiveType, tableAlias, false);
colInfo.setSkewedCol(HiveParserUtils.isSkewedCol(tableAlias, qb, fieldName));
rowResolver.put(tableAlias, fieldName, colInfo);
}
ObjectIdentifier tableIdentifier =
HiveParserBaseSemanticAnalyzer.parseCompoundName(
catalogRegistry, tableName);
// Build Hive Table Scan Rel
RelNode tableRel =
catalogReader
.getTable(
Arrays.asList(
tableIdentifier.getCatalogName(),
tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()))
.toRel(
ViewExpanders.toRelContext(
calciteContext.createToRelContext(), cluster));
if (splitSample != null) {
tableRel =
LogicalSort.create(
tableRel,
cluster.traitSet().canonize(RelCollations.EMPTY),
null,
cluster.getRexBuilder()
.makeExactLiteral(
BigDecimal.valueOf(splitSample.getRowCount())));
}
// 6. Add Schema(RR) to RelNode-Schema map
Map<String, Integer> hiveToCalciteColMap = buildHiveToCalciteColumnMap(rowResolver);
relToRowResolver.put(tableRel, rowResolver);
relToHiveColNameCalcitePosMap.put(tableRel, hiveToCalciteColMap);
return tableRel;
}
} catch (Exception e) {
if (e instanceof SemanticException) {
throw (SemanticException) e;
} else {
throw (new RuntimeException(e));
}
}
}