in fastmodel-transform/fastmodel-transform-flink/src/main/java/com/aliyun/fastmodel/transform/flink/parser/visitor/FlinkParserAstBuilder.java [204:274]
private BaseDataType visitColumnType(SqlTypeNameSpec typeNameSpec) {
if (typeNameSpec instanceof ExtendedSqlRowTypeNameSpec) {
ExtendedSqlRowTypeNameSpec extendedSqlRowTypeNameSpec = (ExtendedSqlRowTypeNameSpec) typeNameSpec;
List<SqlIdentifier> columnNames = extendedSqlRowTypeNameSpec.getFieldNames();
List<SqlDataTypeSpec> dataTypes = extendedSqlRowTypeNameSpec.getFieldTypes();
List<Field> fields = new ArrayList<>();
for (int i = 0; i < columnNames.size(); i++) {
SqlIdentifier columnName = columnNames.get(i);
SqlDataTypeSpec dataType = dataTypes.get(i);
fields.add(new Field(new Identifier(columnName.getSimple()),
visitColumnType(dataType.getTypeNameSpec()), null));
}
return new FlinkRowDataType(fields);
} else if (typeNameSpec instanceof ExtendedSqlCollectionTypeNameSpec) {
ExtendedSqlCollectionTypeNameSpec extendedSqlCollectionTypeNameSpec = (ExtendedSqlCollectionTypeNameSpec) typeNameSpec;
return new FlinkGenericDataType(
new NodeLocation(extendedSqlCollectionTypeNameSpec.getParserPos().getLineNum(),
extendedSqlCollectionTypeNameSpec.getParserPos().getColumnNum()),
extendedSqlCollectionTypeNameSpec.toString(),
extendedSqlCollectionTypeNameSpec.getTypeName().getSimple(),
ImmutableList.of(new TypeParameter(visitColumnType(extendedSqlCollectionTypeNameSpec.getElementTypeName()))));
} else if (typeNameSpec instanceof SqlMapTypeNameSpec) {
SqlMapTypeNameSpec sqlMapTypeNameSpec = (SqlMapTypeNameSpec) typeNameSpec;
return new FlinkGenericDataType(
new NodeLocation(sqlMapTypeNameSpec.getParserPos().getLineNum(),
sqlMapTypeNameSpec.getParserPos().getColumnNum()),
sqlMapTypeNameSpec.toString(),
sqlMapTypeNameSpec.getTypeName().getSimple(),
ImmutableList.of(
new TypeParameter(visitColumnType(sqlMapTypeNameSpec.getKeyType().getTypeNameSpec())),
new TypeParameter(visitColumnType(sqlMapTypeNameSpec.getValType().getTypeNameSpec()))));
} else if (typeNameSpec instanceof SqlRawTypeNameSpec) {
SqlRawTypeNameSpec sqlRawTypeNameSpec = (SqlRawTypeNameSpec) typeNameSpec;
// SqlRawTypeNameSpec没有提供访问内部变量的入口,这里先使用反射实现
try {
java.lang.reflect.Field classNameField = SqlRawTypeNameSpec.class.getDeclaredField("className");
classNameField.setAccessible(true);
SqlNode className = (SqlNode) classNameField.get(sqlRawTypeNameSpec);
java.lang.reflect.Field serializerStringField = SqlRawTypeNameSpec.class.getDeclaredField("serializerString");
serializerStringField.setAccessible(true);
SqlNode serializerString = (SqlNode) serializerStringField.get(sqlRawTypeNameSpec);
return new FlinkRawDataType(Lists.newArrayList(
QualifiedName.of(StripUtils.strip(className.toString())),
QualifiedName.of(StripUtils.strip(serializerString.toString()))));
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new ParseException(e.getMessage());
}
} else {
if (typeNameSpec instanceof SqlAlienSystemTypeNameSpec) {
return new FlinkGenericDataType(new NodeLocation(typeNameSpec.getParserPos().getLineNum(),
typeNameSpec.getParserPos().getColumnNum()),
typeNameSpec.toString(), FlinkDataTypeName.STRING.getValue(), Collections.emptyList());
}
String dataTypeName = typeNameSpec.getTypeName().getSimple();
IDataTypeName dataType = FlinkDataTypeName.getByValue(dataTypeName);
List<DataTypeParameter> list = Lists.newArrayList();
SqlBasicTypeNameSpec sqlBasicTypeNameSpec = (SqlBasicTypeNameSpec) typeNameSpec;
if (sqlBasicTypeNameSpec.getPrecision() > -1) {
list.add(new NumericParameter(String.valueOf(sqlBasicTypeNameSpec.getPrecision())));
}
if (sqlBasicTypeNameSpec.getScale() > -1) {
list.add(new NumericParameter(String.valueOf(sqlBasicTypeNameSpec.getScale())));
}
return new FlinkGenericDataType(new NodeLocation(typeNameSpec.getParserPos().getLineNum(),
typeNameSpec.getParserPos().getColumnNum()),
typeNameSpec.toString(), dataType.getValue(), list);
}
}