in flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java [260:375]
private static DataType convertToDataType(Schema schema, boolean legacyMapping) {
switch (schema.getType()) {
case RECORD:
final List<Schema.Field> schemaFields = schema.getFields();
final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()];
for (int i = 0; i < schemaFields.size(); i++) {
final Schema.Field field = schemaFields.get(i);
fields[i] =
DataTypes.FIELD(
field.name(), convertToDataType(field.schema(), legacyMapping));
}
return DataTypes.ROW(fields).notNull();
case ENUM:
return DataTypes.STRING().notNull();
case ARRAY:
return DataTypes.ARRAY(convertToDataType(schema.getElementType(), legacyMapping))
.notNull();
case MAP:
return DataTypes.MAP(
DataTypes.STRING().notNull(),
convertToDataType(schema.getValueType(), legacyMapping))
.notNull();
case UNION:
final Schema actualSchema;
final boolean nullable;
if (schema.getTypes().size() == 2
&& schema.getTypes().get(0).getType() == Schema.Type.NULL) {
actualSchema = schema.getTypes().get(1);
nullable = true;
} else if (schema.getTypes().size() == 2
&& schema.getTypes().get(1).getType() == Schema.Type.NULL) {
actualSchema = schema.getTypes().get(0);
nullable = true;
} else if (schema.getTypes().size() == 1) {
actualSchema = schema.getTypes().get(0);
nullable = false;
} else {
// use Kryo for serialization
return new AtomicDataType(
new TypeInformationRawType<>(false, Types.GENERIC(Object.class)));
}
DataType converted = convertToDataType(actualSchema, legacyMapping);
return nullable ? converted.nullable() : converted;
case FIXED:
// logical decimal type
if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
final LogicalTypes.Decimal decimalType =
(LogicalTypes.Decimal) schema.getLogicalType();
return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
.notNull();
}
// convert fixed size binary data to primitive byte arrays
return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
case STRING:
// convert Avro's Utf8/CharSequence to String
return DataTypes.STRING().notNull();
case BYTES:
// logical decimal type
if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
final LogicalTypes.Decimal decimalType =
(LogicalTypes.Decimal) schema.getLogicalType();
return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
.notNull();
}
return DataTypes.BYTES().notNull();
case INT:
// logical date and time type
final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
if (logicalType == LogicalTypes.date()) {
return DataTypes.DATE().notNull();
} else if (logicalType == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
}
return DataTypes.INT().notNull();
case LONG:
if (legacyMapping) {
// Avro logical timestamp types to Flink SQL timestamp types
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
return DataTypes.TIMESTAMP(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
return DataTypes.TIME(6).notNull();
}
} else {
// Avro logical timestamp types to Flink SQL timestamp types
if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
return DataTypes.TIME(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
return DataTypes.TIME(6).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) {
return DataTypes.TIMESTAMP(3).notNull();
} else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
return DataTypes.TIMESTAMP(6).notNull();
}
}
return DataTypes.BIGINT().notNull();
case FLOAT:
return DataTypes.FLOAT().notNull();
case DOUBLE:
return DataTypes.DOUBLE().notNull();
case BOOLEAN:
return DataTypes.BOOLEAN().notNull();
case NULL:
return DataTypes.NULL();
}
throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
}