in seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/datatype/MaxComputeTypeConverter.java [110:312]
public Column convert(BasicTypeDefine<TypeInfo> typeDefine) {
PhysicalColumn.PhysicalColumnBuilder builder =
PhysicalColumn.builder()
.name(typeDefine.getName())
.sourceType(typeDefine.getColumnType())
.nullable(typeDefine.isNullable())
.defaultValue(typeDefine.getDefaultValue())
.comment(typeDefine.getComment());
TypeInfo nativeType = typeDefine.getNativeType();
if (nativeType instanceof ArrayTypeInfo) {
typeDefine.setColumnType(
((ArrayTypeInfo) nativeType).getElementTypeInfo().getTypeName());
typeDefine.setDataType(
((ArrayTypeInfo) nativeType).getElementTypeInfo().getOdpsType().name());
typeDefine.setNativeType(((ArrayTypeInfo) nativeType).getElementTypeInfo());
Column arrayColumn = convert(typeDefine);
SeaTunnelDataType<?> newType;
switch (arrayColumn.getDataType().getSqlType()) {
case STRING:
newType = ArrayType.STRING_ARRAY_TYPE;
break;
case BOOLEAN:
newType = ArrayType.BOOLEAN_ARRAY_TYPE;
break;
case TINYINT:
newType = ArrayType.BYTE_ARRAY_TYPE;
break;
case SMALLINT:
newType = ArrayType.SHORT_ARRAY_TYPE;
break;
case INT:
newType = ArrayType.INT_ARRAY_TYPE;
break;
case BIGINT:
newType = ArrayType.LONG_ARRAY_TYPE;
break;
case FLOAT:
newType = ArrayType.FLOAT_ARRAY_TYPE;
break;
case DOUBLE:
newType = ArrayType.DOUBLE_ARRAY_TYPE;
break;
case DATE:
newType = ArrayType.LOCAL_DATE_ARRAY_TYPE;
break;
case TIME:
newType = ArrayType.LOCAL_TIME_ARRAY_TYPE;
break;
case TIMESTAMP:
newType = ArrayType.LOCAL_DATE_TIME_ARRAY_TYPE;
break;
default:
throw CommonError.unsupportedDataType(
MaxcomputeBaseOptions.PLUGIN_NAME,
arrayColumn.getDataType().getSqlType().toString(),
typeDefine.getName());
}
return new PhysicalColumn(
arrayColumn.getName(),
newType,
arrayColumn.getColumnLength(),
arrayColumn.getScale(),
arrayColumn.isNullable(),
arrayColumn.getDefaultValue(),
arrayColumn.getComment(),
"ARRAY<" + arrayColumn.getSourceType() + ">",
arrayColumn.getOptions());
}
if (nativeType instanceof StructTypeInfo) {
List<String> names = ((StructTypeInfo) nativeType).getFieldNames();
List<SeaTunnelDataType<?>> types = new ArrayList<>();
for (TypeInfo typeInfo : ((StructTypeInfo) nativeType).getFieldTypeInfos()) {
BasicTypeDefine<TypeInfo> fieldDefine = new BasicTypeDefine<>();
fieldDefine.setName(names.get(types.size()));
fieldDefine.setColumnType(typeInfo.getTypeName());
fieldDefine.setDataType(typeInfo.getOdpsType().name());
fieldDefine.setNativeType(typeInfo);
types.add(convert(fieldDefine).getDataType());
}
SeaTunnelRowType rowType =
new SeaTunnelRowType(
names.toArray(new String[0]), types.toArray(new SeaTunnelDataType[0]));
return new PhysicalColumn(
typeDefine.getName(),
rowType,
typeDefine.getLength(),
typeDefine.getScale(),
typeDefine.isNullable(),
typeDefine.getDefaultValue(),
typeDefine.getComment(),
typeDefine.getNativeType().getTypeName(),
new HashMap<>());
}
if (nativeType instanceof MapTypeInfo) {
BasicTypeDefine<TypeInfo> keyDefine = new BasicTypeDefine<>();
keyDefine.setName("key");
keyDefine.setColumnType(((MapTypeInfo) nativeType).getKeyTypeInfo().getTypeName());
keyDefine.setDataType(((MapTypeInfo) nativeType).getKeyTypeInfo().getOdpsType().name());
keyDefine.setNativeType(((MapTypeInfo) nativeType).getKeyTypeInfo());
Column keyColumn = convert(keyDefine);
BasicTypeDefine<TypeInfo> valueDefine = new BasicTypeDefine<>();
valueDefine.setName("value");
valueDefine.setColumnType(((MapTypeInfo) nativeType).getValueTypeInfo().getTypeName());
valueDefine.setDataType(
((MapTypeInfo) nativeType).getValueTypeInfo().getOdpsType().name());
valueDefine.setNativeType(((MapTypeInfo) nativeType).getValueTypeInfo());
Column valueColumn = convert(valueDefine);
MapType mapType = new MapType(keyColumn.getDataType(), valueColumn.getDataType());
return new PhysicalColumn(
typeDefine.getName(),
mapType,
typeDefine.getLength(),
typeDefine.getScale(),
typeDefine.isNullable(),
typeDefine.getDefaultValue(),
typeDefine.getComment(),
typeDefine.getNativeType().getTypeName(),
new HashMap<>());
}
if (typeDefine.getNativeType() instanceof DecimalTypeInfo) {
DecimalType decimalType;
if (((DecimalTypeInfo) typeDefine.getNativeType()).getPrecision() > DEFAULT_PRECISION) {
log.warn("{} will probably cause value overflow.", DECIMAL);
decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE);
} else {
decimalType =
new DecimalType(
((DecimalTypeInfo) typeDefine.getNativeType()).getPrecision(),
((DecimalTypeInfo) typeDefine.getNativeType()).getScale());
}
builder.dataType(decimalType);
builder.columnLength((long) decimalType.getPrecision());
builder.scale(decimalType.getScale());
} else if (typeDefine.getNativeType() instanceof AbstractCharTypeInfo) {
// CHAR(n) or VARCHAR(n)
builder.columnLength(
TypeDefineUtils.charTo4ByteLength(
(long)
((AbstractCharTypeInfo) typeDefine.getNativeType())
.getLength()));
builder.dataType(BasicType.STRING_TYPE);
} else {
String dataType = typeDefine.getDataType().toUpperCase();
switch (dataType) {
case BOOLEAN:
builder.dataType(BasicType.BOOLEAN_TYPE);
break;
case TINYINT:
builder.dataType(BasicType.BYTE_TYPE);
break;
case SMALLINT:
builder.dataType(BasicType.SHORT_TYPE);
break;
case INT:
builder.dataType(BasicType.INT_TYPE);
break;
case BIGINT:
builder.dataType(BasicType.LONG_TYPE);
break;
case FLOAT:
builder.dataType(BasicType.FLOAT_TYPE);
break;
case DOUBLE:
builder.dataType(BasicType.DOUBLE_TYPE);
break;
case STRING:
if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {
builder.columnLength(MAX_VARBINARY_LENGTH);
} else {
builder.columnLength(typeDefine.getLength());
}
builder.dataType(BasicType.STRING_TYPE);
break;
case JSON:
builder.dataType(BasicType.STRING_TYPE);
break;
case BINARY:
if (typeDefine.getLength() == null || typeDefine.getLength() <= 0) {
builder.columnLength(MAX_VARBINARY_LENGTH);
} else {
builder.columnLength(typeDefine.getLength());
}
builder.dataType(PrimitiveByteArrayType.INSTANCE);
break;
case DATE:
builder.dataType(LocalTimeType.LOCAL_DATE_TYPE);
break;
case DATETIME:
case TIMESTAMP:
case TIMESTAMP_NTZ:
builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE);
builder.scale(typeDefine.getScale());
break;
case INTERVAL:
default:
throw CommonError.convertToSeaTunnelTypeError(
MaxcomputeBaseOptions.PLUGIN_NAME, dataType, typeDefine.getName());
}
}
return builder.build();
}