in hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java [63:176]
public static DataType convertToDataType(Schema schema) {
switch (schema.getType()) {
case RECORD:
final List<Schema.Field> schemaFields = schema.getFields();
final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()];
for (int i = 0; i < schemaFields.size(); i++) {
final Schema.Field field = schemaFields.get(i);
fields[i] = DataTypes.FIELD(field.name(), convertToDataType(field.schema()));
}
return DataTypes.ROW(fields).notNull();
case ENUM:
case STRING:
// convert Avro's Utf8/CharSequence to String
return DataTypes.STRING().notNull();
case ARRAY:
return DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull();
case MAP:
return DataTypes.MAP(
DataTypes.STRING().notNull(),
convertToDataType(schema.getValueType()))
.notNull();
case UNION:
final Schema actualSchema;
final boolean nullable;
if (schema.getTypes().size() == 2
&& schema.getTypes().get(0).getType() == Schema.Type.NULL) {
actualSchema = schema.getTypes().get(1);
nullable = true;
} else if (schema.getTypes().size() == 2
&& schema.getTypes().get(1).getType() == Schema.Type.NULL) {
actualSchema = schema.getTypes().get(0);
nullable = true;
} else if (schema.getTypes().size() == 1) {
actualSchema = schema.getTypes().get(0);
nullable = false;
} else {
List<Schema> nonNullTypes = schema.getTypes().stream()
.filter(s -> s.getType() != Schema.Type.NULL)
.collect(Collectors.toList());
nullable = schema.getTypes().size() > nonNullTypes.size();
// use Kryo for serialization
DataType rawDataType = new AtomicDataType(
new TypeInformationRawType<>(false, Types.GENERIC(Object.class)))
.notNull();
if (recordTypesOfSameNumFields(nonNullTypes)) {
DataType converted = DataTypes.ROW(
DataTypes.FIELD("wrapper", rawDataType))
.notNull();
return nullable ? converted.nullable() : converted;
}
// use Kryo for serialization
return nullable ? rawDataType.nullable() : rawDataType;
}
DataType converted = convertToDataType(actualSchema);
return nullable ? converted.nullable() : converted;
case FIXED:
// logical decimal type
if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
final LogicalTypes.Decimal decimalType =
(LogicalTypes.Decimal) schema.getLogicalType();
return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
.notNull();
}
// convert fixed size binary data to primitive byte arrays
return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
case BYTES:
// logical decimal type
if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
final LogicalTypes.Decimal decimalType =
(LogicalTypes.Decimal) schema.getLogicalType();
return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
.notNull();
}
return DataTypes.BYTES().notNull();
case INT:
// logical date and time type
final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
if (logicalType == LogicalTypes.date()) {
return DataTypes.DATE().notNull();
} else if (logicalType == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
}
return DataTypes.INT().notNull();
case LONG:
// logical timestamp type
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
return DataTypes.TIMESTAMP(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
return DataTypes.TIME(6).notNull();
}
return DataTypes.BIGINT().notNull();
case FLOAT:
return DataTypes.FLOAT().notNull();
case DOUBLE:
return DataTypes.DOUBLE().notNull();
case BOOLEAN:
return DataTypes.BOOLEAN().notNull();
case NULL:
return DataTypes.NULL();
default:
throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
}
}