private BaseDataType visitColumnType()

in fastmodel-transform/fastmodel-transform-flink/src/main/java/com/aliyun/fastmodel/transform/flink/parser/visitor/FlinkParserAstBuilder.java [204:274]


    private BaseDataType visitColumnType(SqlTypeNameSpec typeNameSpec) {
        if (typeNameSpec instanceof ExtendedSqlRowTypeNameSpec) {
            ExtendedSqlRowTypeNameSpec extendedSqlRowTypeNameSpec = (ExtendedSqlRowTypeNameSpec) typeNameSpec;
            List<SqlIdentifier> columnNames = extendedSqlRowTypeNameSpec.getFieldNames();
            List<SqlDataTypeSpec> dataTypes = extendedSqlRowTypeNameSpec.getFieldTypes();
            List<Field> fields = new ArrayList<>();
            for (int i = 0; i < columnNames.size(); i++) {
                SqlIdentifier columnName = columnNames.get(i);
                SqlDataTypeSpec dataType = dataTypes.get(i);
                fields.add(new Field(new Identifier(columnName.getSimple()),
                    visitColumnType(dataType.getTypeNameSpec()), null));
            }
            return new FlinkRowDataType(fields);
        } else if (typeNameSpec instanceof ExtendedSqlCollectionTypeNameSpec) {
            ExtendedSqlCollectionTypeNameSpec extendedSqlCollectionTypeNameSpec = (ExtendedSqlCollectionTypeNameSpec) typeNameSpec;

            return new FlinkGenericDataType(
                new NodeLocation(extendedSqlCollectionTypeNameSpec.getParserPos().getLineNum(),
                    extendedSqlCollectionTypeNameSpec.getParserPos().getColumnNum()),
                extendedSqlCollectionTypeNameSpec.toString(),
                extendedSqlCollectionTypeNameSpec.getTypeName().getSimple(),
                ImmutableList.of(new TypeParameter(visitColumnType(extendedSqlCollectionTypeNameSpec.getElementTypeName()))));
        } else if (typeNameSpec instanceof SqlMapTypeNameSpec) {
            SqlMapTypeNameSpec sqlMapTypeNameSpec = (SqlMapTypeNameSpec) typeNameSpec;
            return new FlinkGenericDataType(
                new NodeLocation(sqlMapTypeNameSpec.getParserPos().getLineNum(),
                    sqlMapTypeNameSpec.getParserPos().getColumnNum()),
                sqlMapTypeNameSpec.toString(),
                sqlMapTypeNameSpec.getTypeName().getSimple(),
                ImmutableList.of(
                    new TypeParameter(visitColumnType(sqlMapTypeNameSpec.getKeyType().getTypeNameSpec())),
                    new TypeParameter(visitColumnType(sqlMapTypeNameSpec.getValType().getTypeNameSpec()))));
        } else if (typeNameSpec instanceof SqlRawTypeNameSpec) {
            SqlRawTypeNameSpec sqlRawTypeNameSpec = (SqlRawTypeNameSpec) typeNameSpec;
            // SqlRawTypeNameSpec没有提供访问内部变量的入口,这里先使用反射实现
            try {
                java.lang.reflect.Field classNameField = SqlRawTypeNameSpec.class.getDeclaredField("className");
                classNameField.setAccessible(true);
                SqlNode className = (SqlNode) classNameField.get(sqlRawTypeNameSpec);
                java.lang.reflect.Field serializerStringField = SqlRawTypeNameSpec.class.getDeclaredField("serializerString");
                serializerStringField.setAccessible(true);
                SqlNode serializerString = (SqlNode) serializerStringField.get(sqlRawTypeNameSpec);
                return new FlinkRawDataType(Lists.newArrayList(
                    QualifiedName.of(StripUtils.strip(className.toString())),
                    QualifiedName.of(StripUtils.strip(serializerString.toString()))));
            } catch (NoSuchFieldException | IllegalAccessException e) {
                throw new ParseException(e.getMessage());
            }

        } else {
            if (typeNameSpec instanceof SqlAlienSystemTypeNameSpec) {
                return new FlinkGenericDataType(new NodeLocation(typeNameSpec.getParserPos().getLineNum(),
                    typeNameSpec.getParserPos().getColumnNum()),
                    typeNameSpec.toString(), FlinkDataTypeName.STRING.getValue(), Collections.emptyList());
            }
            String dataTypeName = typeNameSpec.getTypeName().getSimple();
            IDataTypeName dataType = FlinkDataTypeName.getByValue(dataTypeName);

            List<DataTypeParameter> list = Lists.newArrayList();
            SqlBasicTypeNameSpec sqlBasicTypeNameSpec = (SqlBasicTypeNameSpec) typeNameSpec;
            if (sqlBasicTypeNameSpec.getPrecision() > -1) {
                list.add(new NumericParameter(String.valueOf(sqlBasicTypeNameSpec.getPrecision())));
            }
            if (sqlBasicTypeNameSpec.getScale() > -1) {
                list.add(new NumericParameter(String.valueOf(sqlBasicTypeNameSpec.getScale())));
            }
            return new FlinkGenericDataType(new NodeLocation(typeNameSpec.getParserPos().getLineNum(),
                typeNameSpec.getParserPos().getColumnNum()),
                typeNameSpec.toString(), dataType.getValue(), list);
        }
    }