public static List generateProjectionColumns()

in flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java [274:383]


    public static List<ProjectionColumn> generateProjectionColumns(
            String projectionExpression,
            List<Column> columns,
            List<UserDefinedFunctionDescriptor> udfDescriptors,
            SupportedMetadataColumn[] supportedMetadataColumns) {
        if (isNullOrWhitespaceOnly(projectionExpression)) {
            return new ArrayList<>();
        }
        SqlSelect sqlSelect = parseProjectionExpression(projectionExpression);
        if (sqlSelect.getSelectList().isEmpty()) {
            return new ArrayList<>();
        }

        expandWildcard(sqlSelect, columns);
        RelNode relNode = sqlToRel(columns, sqlSelect, udfDescriptors, supportedMetadataColumns);
        RelDataType[] relDataTypes =
                relNode.getRowType().getFieldList().stream()
                        .map(RelDataTypeField::getType)
                        .toArray(RelDataType[]::new);
        Map<String, Column> originalColumnMap =
                columns.stream().collect(Collectors.toMap(Column::getName, column -> column));
        List<ProjectionColumn> projectionColumns = new ArrayList<>();
        Map<String, Integer> addedProjectionColumnNames = new HashMap<>();

        SqlNodeList selectExpressionList = sqlSelect.getSelectList();
        for (int i = 0; i < selectExpressionList.size(); i++) {
            SqlNode sqlNode = selectExpressionList.get(i);
            RelDataType relDataType = relDataTypes[i];
            ProjectionColumn projectionColumn;

            // A projection column could be <EXPR> AS <IDENTIFIER>...
            if (sqlNode instanceof SqlBasicCall) {
                SqlBasicCall sqlBasicCall = (SqlBasicCall) sqlNode;
                List<SqlNode> operandList = sqlBasicCall.getOperandList();
                Preconditions.checkArgument(
                        SqlKind.AS.equals(sqlBasicCall.getOperator().kind)
                                && operandList.size() == 2
                                && operandList.get(1) instanceof SqlIdentifier,
                        "Unrecognized projection expression: "
                                + sqlBasicCall
                                + ". Should be <EXPR> AS <IDENTIFIER>");

                // It's the identifier node for aliased column.
                SqlIdentifier aliasNode = (SqlIdentifier) operandList.get(1);
                String columnName = aliasNode.names.get(aliasNode.names.size() - 1);

                Preconditions.checkArgument(
                        !isMetadataColumn(columnName, supportedMetadataColumns),
                        "Column name %s is reserved and shading it is not allowed.",
                        columnName);

                // This is the actual expression node of this projection column.
                SqlNode exprNode = operandList.get(0);

                if (exprNode instanceof SqlIdentifier) {
                    // This is a simple column rename like col_a AS col_b. Simply forward it to
                    // avoid losing metadata info like comments and default expressions.
                    SqlIdentifier identifierExprNode = (SqlIdentifier) exprNode;
                    String originalName =
                            identifierExprNode.names.get(identifierExprNode.names.size() - 1);
                    projectionColumn =
                            resolveProjectionColumnFromIdentifier(
                                    relDataType,
                                    originalColumnMap,
                                    originalName,
                                    columnName,
                                    supportedMetadataColumns);
                } else {
                    List<String> originalColumnNames = parseColumnNameList(exprNode);
                    Map<String, String> columnNameMap = generateColumnNameMap(originalColumnNames);
                    projectionColumn =
                            ProjectionColumn.ofCalculated(
                                    columnName,
                                    DataTypeConverter.convertCalciteRelDataTypeToDataType(
                                            relDataType),
                                    exprNode.toString(),
                                    JaninoCompiler.translateSqlNodeToJaninoExpression(
                                            exprNode, udfDescriptors, columnNameMap),
                                    originalColumnNames,
                                    columnNameMap);
                }
            }
            // ... or an existing column's name identifier.
            else if (sqlNode instanceof SqlIdentifier) {
                SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
                String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
                projectionColumn =
                        resolveProjectionColumnFromIdentifier(
                                relDataType,
                                originalColumnMap,
                                columnName,
                                columnName,
                                supportedMetadataColumns);
            } else {
                throw new ParseException("Unrecognized projection: " + sqlNode.toString());
            }
            // Projection columns comes later could override previous ones.
            String projectionColumnName = projectionColumn.getColumnName();
            if (addedProjectionColumnNames.containsKey(projectionColumnName)) {
                // If we already have one column with identical name, replace it
                projectionColumns.set(
                        addedProjectionColumnNames.get(projectionColumnName), projectionColumn);
            } else {
                // Otherwise, append it at the end. Don't forget to set the index!
                projectionColumns.add(projectionColumn);
                addedProjectionColumnNames.put(projectionColumnName, projectionColumns.size() - 1);
            }
        }
        return projectionColumns;
    }