in seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java [287:404]
private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type, String name) {
if (type.isPrimitive()) {
switch (type.asPrimitiveType().getPrimitiveTypeName()) {
case INT32:
OriginalType originalType = type.asPrimitiveType().getOriginalType();
if (originalType == null) {
return BasicType.INT_TYPE;
}
switch (type.asPrimitiveType().getOriginalType()) {
case INT_8:
return BasicType.BYTE_TYPE;
case INT_16:
return BasicType.SHORT_TYPE;
case INT_32:
return BasicType.INT_TYPE;
case DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
default:
throw CommonError.convertToSeaTunnelTypeError(
PARQUET, type.toString(), name);
}
case INT64:
if (type.asPrimitiveType().getOriginalType() == OriginalType.TIMESTAMP_MILLIS) {
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
}
return BasicType.LONG_TYPE;
case INT96:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
case BINARY:
if (type.asPrimitiveType().getOriginalType() == null) {
return PrimitiveByteArrayType.INSTANCE;
}
return BasicType.STRING_TYPE;
case FLOAT:
return BasicType.FLOAT_TYPE;
case DOUBLE:
return BasicType.DOUBLE_TYPE;
case BOOLEAN:
return BasicType.BOOLEAN_TYPE;
case FIXED_LEN_BYTE_ARRAY:
if (type.getLogicalTypeAnnotation() == null) {
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
}
String typeInfo =
type.getLogicalTypeAnnotation()
.toString()
.replaceAll(SqlType.DECIMAL.toString(), "")
.replaceAll("\\(", "")
.replaceAll("\\)", "");
String[] splits = typeInfo.split(",");
int precision = Integer.parseInt(splits[0]);
int scale = Integer.parseInt(splits[1]);
return new DecimalType(precision, scale);
default:
throw CommonError.convertToSeaTunnelTypeError("Parquet", type.toString(), name);
}
} else {
LogicalTypeAnnotation logicalTypeAnnotation =
type.asGroupType().getLogicalTypeAnnotation();
if (logicalTypeAnnotation == null) {
// struct type
List<Type> fields = type.asGroupType().getFields();
String[] fieldNames = new String[fields.size()];
SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType<?>[fields.size()];
for (int i = 0; i < fields.size(); i++) {
Type fieldType = fields.get(i);
SeaTunnelDataType<?> seaTunnelDataType =
parquetType2SeaTunnelType(fields.get(i), name);
fieldNames[i] = fieldType.getName();
seaTunnelDataTypes[i] = seaTunnelDataType;
}
return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
} else {
switch (logicalTypeAnnotation.toOriginalType()) {
case MAP:
GroupType groupType = type.asGroupType().getType(0).asGroupType();
SeaTunnelDataType<?> keyType =
parquetType2SeaTunnelType(groupType.getType(0), name);
SeaTunnelDataType<?> valueType =
parquetType2SeaTunnelType(groupType.getType(1), name);
return new MapType<>(keyType, valueType);
case LIST:
Type elementType;
try {
elementType = type.asGroupType().getType(0).asGroupType().getType(0);
} catch (Exception e) {
elementType = type.asGroupType().getType(0);
}
SeaTunnelDataType<?> fieldType =
parquetType2SeaTunnelType(elementType, name);
switch (fieldType.getSqlType()) {
case STRING:
return ArrayType.STRING_ARRAY_TYPE;
case BOOLEAN:
return ArrayType.BOOLEAN_ARRAY_TYPE;
case TINYINT:
return ArrayType.BYTE_ARRAY_TYPE;
case SMALLINT:
return ArrayType.SHORT_ARRAY_TYPE;
case INT:
return ArrayType.INT_ARRAY_TYPE;
case BIGINT:
return ArrayType.LONG_ARRAY_TYPE;
case FLOAT:
return ArrayType.FLOAT_ARRAY_TYPE;
case DOUBLE:
return ArrayType.DOUBLE_ARRAY_TYPE;
default:
throw CommonError.convertToSeaTunnelTypeError(
PARQUET, type.toString(), name);
}
default:
throw CommonError.convertToSeaTunnelTypeError(
PARQUET, type.toString(), name);
}
}
}
}