in flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java [2461:2685]
private RelNode genUDTFPlan(
SqlOperator sqlOperator,
String genericUDTFName,
String outputTableAlias,
List<String> colAliases,
HiveParserQB qb,
List<RexNode> operands,
List<ColumnInfo> opColInfos,
RelNode input,
boolean inSelect,
boolean isOuter)
throws SemanticException {
Preconditions.checkState(!isOuter || !inSelect, "OUTER is not supported for SELECT UDTF");
// No GROUP BY / DISTRIBUTE BY / SORT BY / CLUSTER BY
HiveParserQBParseInfo qbp = qb.getParseInfo();
if (inSelect && !qbp.getDestToGroupBy().isEmpty()) {
throw new SemanticException(ErrorMsg.UDTF_NO_GROUP_BY.getMsg());
}
if (inSelect && !qbp.getDestToDistributeBy().isEmpty()) {
throw new SemanticException(ErrorMsg.UDTF_NO_DISTRIBUTE_BY.getMsg());
}
if (inSelect && !qbp.getDestToSortBy().isEmpty()) {
throw new SemanticException(ErrorMsg.UDTF_NO_SORT_BY.getMsg());
}
if (inSelect && !qbp.getDestToClusterBy().isEmpty()) {
throw new SemanticException(ErrorMsg.UDTF_NO_CLUSTER_BY.getMsg());
}
if (inSelect && !qbp.getAliasToLateralViews().isEmpty()) {
throw new SemanticException(ErrorMsg.UDTF_LATERAL_VIEW.getMsg());
}
LOG.debug("Table alias: " + outputTableAlias + " Col aliases: " + colAliases);
// Create the object inspector for the input columns and initialize the UDTF
RelDataType relDataType =
HiveParserUtils.inferReturnTypeForOperands(
sqlOperator, operands, cluster.getTypeFactory());
DataType dataType = HiveParserUtils.toDataType(relDataType);
StructObjectInspector outputOI =
(StructObjectInspector)
HiveInspectors.getObjectInspector(
HiveTypeUtil.toHiveTypeInfo(dataType, false));
// make up a table alias if it's not present, so that we can properly generate a combined RR
// this should only happen for select udtf
if (outputTableAlias == null) {
Preconditions.checkState(inSelect, "Table alias not specified for lateral view");
String prefix = "select_" + genericUDTFName + "_alias_";
int i = 0;
while (qb.getAliases().contains(prefix + i)) {
i++;
}
outputTableAlias = prefix + i;
}
if (colAliases.isEmpty()) {
// user did not specify alias names, infer names from outputOI
for (StructField field : outputOI.getAllStructFieldRefs()) {
colAliases.add(field.getFieldName());
}
}
// Make sure that the number of column aliases in the AS clause matches the number of
// columns output by the UDTF
int numOutputCols = outputOI.getAllStructFieldRefs().size();
int numSuppliedAliases = colAliases.size();
if (numOutputCols != numSuppliedAliases) {
throw new SemanticException(
ErrorMsg.UDTF_ALIAS_MISMATCH.getMsg(
"expected "
+ numOutputCols
+ " aliases "
+ "but got "
+ numSuppliedAliases));
}
// Generate the output column info's / row resolver using internal names.
ArrayList<ColumnInfo> udtfOutputCols = new ArrayList<>();
Iterator<String> colAliasesIter = colAliases.iterator();
for (StructField sf : outputOI.getAllStructFieldRefs()) {
String colAlias = colAliasesIter.next();
assert (colAlias != null);
// Since the UDTF operator feeds into a LVJ operator that will rename all the internal
// names,
// we can just use field name from the UDTF's OI as the internal name
ColumnInfo col =
new ColumnInfo(
sf.getFieldName(),
TypeInfoUtils.getTypeInfoFromObjectInspector(
sf.getFieldObjectInspector()),
outputTableAlias,
false);
udtfOutputCols.add(col);
}
// Create the row resolver for the table function scan
HiveParserRowResolver udtfOutRR = new HiveParserRowResolver();
for (int i = 0; i < udtfOutputCols.size(); i++) {
udtfOutRR.put(outputTableAlias, colAliases.get(i), udtfOutputCols.get(i));
}
// Build row type from field <type, name>
RelDataType retType = HiveParserTypeConverter.getType(cluster, udtfOutRR, null);
List<RelDataType> argTypes = new ArrayList<>();
RelDataTypeFactory dtFactory = cluster.getRexBuilder().getTypeFactory();
for (ColumnInfo ci : opColInfos) {
argTypes.add(HiveParserUtils.toRelDataType(ci.getType(), dtFactory));
}
SqlOperator calciteOp =
HiveParserSqlFunctionConverter.getCalciteFn(
genericUDTFName, argTypes, retType, false, funcConverter);
RexNode rexNode = cluster.getRexBuilder().makeCall(calciteOp, operands);
// convert the rex call
TableFunctionConverter udtfConverter =
new TableFunctionConverter(
cluster,
input,
frameworkConfig.getOperatorTable(),
catalogReader.nameMatcher());
RexCall convertedCall = (RexCall) rexNode.accept(udtfConverter);
SqlOperator convertedOperator = convertedCall.getOperator();
Preconditions.checkState(
HiveParserUtils.isBridgingSqlFunction(convertedOperator),
"Expect operator to be "
+ HiveParserUtils.BRIDGING_SQL_FUNCTION_CLZ_NAME
+ ", actually got "
+ convertedOperator.getClass().getSimpleName());
// TODO: how to decide this?
Type elementType = Object[].class;
// create LogicalTableFunctionScan
RelNode tableFunctionScan =
LogicalTableFunctionScan.create(
input.getCluster(),
Collections.emptyList(),
convertedCall,
elementType,
retType,
null);
// remember the table alias for the UDTF so that we can reference the cols later
qb.addAlias(outputTableAlias);
RelNode correlRel;
RexBuilder rexBuilder = cluster.getRexBuilder();
// find correlation in the converted call
Pair<List<CorrelationId>, ImmutableBitSet> correlUse = getCorrelationUse(convertedCall);
// create correlate node
if (correlUse == null) {
correlRel =
calciteContext
.createRelBuilder()
.push(input)
.push(tableFunctionScan)
.join(
isOuter ? JoinRelType.LEFT : JoinRelType.INNER,
rexBuilder.makeLiteral(true))
.build();
} else {
if (correlUse.left.size() > 1) {
tableFunctionScan =
DeduplicateCorrelateVariables.go(
rexBuilder,
correlUse.left.get(0),
Util.skip(correlUse.left),
tableFunctionScan);
}
correlRel =
LogicalCorrelate.create(
input,
tableFunctionScan,
correlUse.left.get(0),
correlUse.right,
isOuter ? JoinRelType.LEFT : JoinRelType.INNER);
}
// Add new rel & its RR to the maps
relToHiveColNameCalcitePosMap.put(
tableFunctionScan, buildHiveToCalciteColumnMap(udtfOutRR));
relToRowResolver.put(tableFunctionScan, udtfOutRR);
HiveParserRowResolver correlRR =
HiveParserRowResolver.getCombinedRR(
relToRowResolver.get(input), relToRowResolver.get(tableFunctionScan));
relToHiveColNameCalcitePosMap.put(correlRel, buildHiveToCalciteColumnMap(correlRR));
relToRowResolver.put(correlRel, correlRR);
if (!inSelect) {
return correlRel;
}
// create project node
List<RexNode> projects = new ArrayList<>();
HiveParserRowResolver projectRR = new HiveParserRowResolver();
int j = 0;
for (int i = input.getRowType().getFieldCount();
i < correlRel.getRowType().getFieldCount();
i++) {
projects.add(cluster.getRexBuilder().makeInputRef(correlRel, i));
ColumnInfo inputColInfo = correlRR.getRowSchema().getSignature().get(i);
String colAlias = inputColInfo.getAlias();
ColumnInfo colInfo =
new ColumnInfo(
getColumnInternalName(j++),
inputColInfo.getObjectInspector(),
null,
false);
projectRR.put(null, colAlias, colInfo);
}
RelNode projectNode =
LogicalProject.create(
correlRel,
Collections.emptyList(),
projects,
tableFunctionScan.getRowType());
relToHiveColNameCalcitePosMap.put(projectNode, buildHiveToCalciteColumnMap(projectRR));
relToRowResolver.put(projectNode, projectRR);
return projectNode;
}