in hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/parquet/ParquetSchemaConverter.java [362:537]
private static Type convertField(
String fieldName,
TypeInformation<?> typeInfo,
Type.Repetition inheritRepetition,
boolean legacyMode) {
Type fieldType = null;
Type.Repetition repetition =
inheritRepetition == null ? Type.Repetition.OPTIONAL : inheritRepetition;
if (typeInfo instanceof BasicTypeInfo) {
BasicTypeInfo basicTypeInfo = (BasicTypeInfo) typeInfo;
if (basicTypeInfo.equals(BasicTypeInfo.BIG_DEC_TYPE_INFO)
|| basicTypeInfo.equals(BasicTypeInfo.BIG_INT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.DECIMAL)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.INT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_32)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(OriginalType.INT_64)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_16)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_8)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.CHAR_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.DATE_TYPE_INFO)
|| basicTypeInfo.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.named(fieldName);
}
} else if (typeInfo instanceof MapTypeInfo) {
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
if (mapTypeInfo.getKeyTypeInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) {
fieldType =
Types.map(repetition)
.value(
convertField(
MAP_VALUE,
mapTypeInfo.getValueTypeInfo(),
Type.Repetition.OPTIONAL,
legacyMode))
.named(fieldName);
} else {
throw new UnsupportedOperationException(
String.format(
"Can not convert Flink MapTypeInfo %s to Parquet"
+ " Map type as key has to be String",
typeInfo));
}
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;
// Get all required sub fields
GroupType componentGroup =
(GroupType)
convertField(
LIST_ELEMENT,
objectArrayTypeInfo.getComponentInfo(),
Type.Repetition.REQUIRED,
legacyMode);
GroupType elementGroup = Types.repeatedGroup().named(LIST_ELEMENT);
elementGroup = elementGroup.withNewFields(componentGroup.getFields());
fieldType =
Types.buildGroup(repetition)
.addField(elementGroup)
.as(OriginalType.LIST)
.named(fieldName);
} else if (typeInfo instanceof BasicArrayTypeInfo) {
BasicArrayTypeInfo basicArrayType = (BasicArrayTypeInfo) typeInfo;
if (legacyMode) {
// Add extra layer of Group according to Parquet's standard
Type listGroup =
Types.repeatedGroup()
.addField(
convertField(
LIST_ELEMENT,
basicArrayType.getComponentInfo(),
Type.Repetition.REQUIRED,
legacyMode))
.named(LIST_GROUP_NAME);
fieldType =
Types.buildGroup(repetition)
.addField(listGroup)
.as(OriginalType.LIST)
.named(fieldName);
} else {
PrimitiveType primitiveTyp =
convertField(
fieldName,
basicArrayType.getComponentInfo(),
Type.Repetition.REQUIRED,
legacyMode)
.asPrimitiveType();
fieldType =
Types.buildGroup(repetition)
.repeated(primitiveTyp.getPrimitiveTypeName())
.as(primitiveTyp.getOriginalType())
.named(LIST_ARRAY_TYPE)
.as(OriginalType.LIST)
.named(fieldName);
}
} else if (typeInfo instanceof SqlTimeTypeInfo) {
if (typeInfo.equals(SqlTimeTypeInfo.DATE)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.DATE)
.named(fieldName);
} else if (typeInfo.equals(SqlTimeTypeInfo.TIME)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.TIME_MILLIS)
.named(fieldName);
} else if (typeInfo.equals(SqlTimeTypeInfo.TIMESTAMP)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(OriginalType.TIMESTAMP_MILLIS)
.named(fieldName);
} else {
throw new UnsupportedOperationException(
"Unsupported SqlTimeTypeInfo " + typeInfo.toString());
}
} else {
RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
List<Type> types = new ArrayList<>();
String[] fieldNames = rowTypeInfo.getFieldNames();
TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
for (int i = 0; i < rowTypeInfo.getArity(); i++) {
types.add(convertField(fieldNames[i], fieldTypes[i], repetition, legacyMode));
}
if (fieldName == null) {
fieldType = new MessageType(MESSAGE_ROOT, types);
} else {
fieldType = new GroupType(repetition, fieldName, types);
}
}
return fieldType;
}