in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java [274:383]
public static List<ProjectionColumn> generateProjectionColumns(
String projectionExpression,
List<Column> columns,
List<UserDefinedFunctionDescriptor> udfDescriptors,
SupportedMetadataColumn[] supportedMetadataColumns) {
if (isNullOrWhitespaceOnly(projectionExpression)) {
return new ArrayList<>();
}
SqlSelect sqlSelect = parseProjectionExpression(projectionExpression);
if (sqlSelect.getSelectList().isEmpty()) {
return new ArrayList<>();
}
expandWildcard(sqlSelect, columns);
RelNode relNode = sqlToRel(columns, sqlSelect, udfDescriptors, supportedMetadataColumns);
RelDataType[] relDataTypes =
relNode.getRowType().getFieldList().stream()
.map(RelDataTypeField::getType)
.toArray(RelDataType[]::new);
Map<String, Column> originalColumnMap =
columns.stream().collect(Collectors.toMap(Column::getName, column -> column));
List<ProjectionColumn> projectionColumns = new ArrayList<>();
Map<String, Integer> addedProjectionColumnNames = new HashMap<>();
SqlNodeList selectExpressionList = sqlSelect.getSelectList();
for (int i = 0; i < selectExpressionList.size(); i++) {
SqlNode sqlNode = selectExpressionList.get(i);
RelDataType relDataType = relDataTypes[i];
ProjectionColumn projectionColumn;
// A projection column could be <EXPR> AS <IDENTIFIER>...
if (sqlNode instanceof SqlBasicCall) {
SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
List<SqlNode> operandList = sqlBasicCall.getOperandList();
Preconditions.checkArgument(
SqlKind.AS.equals(sqlBasicCall.getOperator().kind)
&& operandList.size() == 2
&& operandList.get(1) instanceof SqlIdentifier,
"Unrecognized projection expression: "
+ sqlBasicCall
+ ". Should be <EXPR> AS <IDENTIFIER>");
// It's the identifier node for aliased column.
SqlIdentifier aliasNode = (SqlIdentifier) operandList.get(1);
String columnName = aliasNode.names.get(aliasNode.names.size() - 1);
Preconditions.checkArgument(
!isMetadataColumn(columnName, supportedMetadataColumns),
"Column name %s is reserved and shading it is not allowed.",
columnName);
// This is the actual expression node of this projection column.
SqlNode exprNode = operandList.get(0);
if (exprNode instanceof SqlIdentifier) {
// This is a simple column rename like col_a AS col_b. Simply forward it to
// avoid losing metadata info like comments and default expressions.
SqlIdentifier identifierExprNode = (SqlIdentifier) exprNode;
String originalName =
identifierExprNode.names.get(identifierExprNode.names.size() - 1);
projectionColumn =
resolveProjectionColumnFromIdentifier(
relDataType,
originalColumnMap,
originalName,
columnName,
supportedMetadataColumns);
} else {
List<String> originalColumnNames = parseColumnNameList(exprNode);
Map<String, String> columnNameMap = generateColumnNameMap(originalColumnNames);
projectionColumn =
ProjectionColumn.ofCalculated(
columnName,
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataType),
exprNode.toString(),
JaninoCompiler.translateSqlNodeToJaninoExpression(
exprNode, udfDescriptors, columnNameMap),
originalColumnNames,
columnNameMap);
}
}
// ... or an existing column's name identifier.
else if (sqlNode instanceof SqlIdentifier) {
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
projectionColumn =
resolveProjectionColumnFromIdentifier(
relDataType,
originalColumnMap,
columnName,
columnName,
supportedMetadataColumns);
} else {
throw new ParseException("Unrecognized projection: " + sqlNode.toString());
}
// Projection columns comes later could override previous ones.
String projectionColumnName = projectionColumn.getColumnName();
if (addedProjectionColumnNames.containsKey(projectionColumnName)) {
// If we already have one column with identical name, replace it
projectionColumns.set(
addedProjectionColumnNames.get(projectionColumnName), projectionColumn);
} else {
// Otherwise, append it at the end. Don't forget to set the index!
projectionColumns.add(projectionColumn);
addedProjectionColumnNames.put(projectionColumnName, projectionColumns.size() - 1);
}
}
return projectionColumns;
}