in fastmodel-transform/fastmodel-transform-flink/src/main/java/com/aliyun/fastmodel/transform/flink/parser/visitor/FlinkAstBuilder.java [168:214]
public BaseDataType visitColumnType(ColumnTypeContext ctx) {
if (ctx.KW_ARRAY() != null) {
return new FlinkGenericDataType(
getLocation(ctx),
getOrigin(ctx),
ctx.KW_ARRAY().getText(),
ImmutableList.of(new TypeParameter((BaseDataType) visit(ctx.lengthOneTypeDimension().columnType()))));
} else if (ctx.KW_MAP() != null) {
return new FlinkGenericDataType(
getLocation(ctx),
getOrigin(ctx),
ctx.KW_MAP().getText(),
ImmutableList.of(
new TypeParameter((BaseDataType) visit(ctx.mapTypeDimension().columnType(0))),
new TypeParameter((BaseDataType) visit(ctx.mapTypeDimension().columnType(1)))));
} else if (ctx.KW_ROW() != null) {
List<Identifier> columnNames = ParserHelper.visit(this, ctx.rowTypeDimension().columnName(), Identifier.class);
List<BaseDataType> dataTypes = ParserHelper.visit(this, ctx.rowTypeDimension().columnType(), BaseDataType.class);
List<Field> fields = new ArrayList<>();
for (int i = 0; i < columnNames.size(); i++) {
Identifier columnName = columnNames.get(i);
BaseDataType dataType = dataTypes.get(i);
fields.add(new Field(columnName, dataType, null));
}
return new FlinkRowDataType(fields);
} else if (ctx.KW_RAW() != null) {
List<StringLiteral> columnNames = ParserHelper.visit(this, ctx.lengthTwoStringDimension().stringLiteral(), StringLiteral.class);
List<QualifiedName> fields = columnNames.stream()
.map(columnName -> QualifiedName.of(columnName.getValue()))
.collect(Collectors.toList());
return new FlinkRawDataType(fields);
} else {
Token name = ctx.typeName;
IDataTypeName byValue = FlinkDataTypeName.getByValue(name.getText());
List<DataTypeParameter> list = Lists.newArrayList();
if (ctx.lengthOneDimension() != null) {
NumericParameter numericParameter = (NumericParameter) visit(ctx.lengthOneDimension().decimalLiteral());
list.add(numericParameter);
}
if (ctx.lengthTwoOptionalDimension() != null) {
List<NumericParameter> numericParameters = ParserHelper.visit(this,
ctx.lengthTwoOptionalDimension().decimalLiteral(), NumericParameter.class);
list.addAll(numericParameters);
}
return new FlinkGenericDataType(getLocation(ctx), getOrigin(ctx), byValue.getValue(), list);
}
}