private static DataType convertToDataType()

in flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java [260:375]


    private static DataType convertToDataType(Schema schema, boolean legacyMapping) {
        switch (schema.getType()) {
            case RECORD:
                final List<Schema.Field> schemaFields = schema.getFields();

                final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()];
                for (int i = 0; i < schemaFields.size(); i++) {
                    final Schema.Field field = schemaFields.get(i);
                    fields[i] =
                            DataTypes.FIELD(
                                    field.name(), convertToDataType(field.schema(), legacyMapping));
                }
                return DataTypes.ROW(fields).notNull();
            case ENUM:
                return DataTypes.STRING().notNull();
            case ARRAY:
                return DataTypes.ARRAY(convertToDataType(schema.getElementType(), legacyMapping))
                        .notNull();
            case MAP:
                return DataTypes.MAP(
                                DataTypes.STRING().notNull(),
                                convertToDataType(schema.getValueType(), legacyMapping))
                        .notNull();
            case UNION:
                final Schema actualSchema;
                final boolean nullable;
                if (schema.getTypes().size() == 2
                        && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
                    actualSchema = schema.getTypes().get(1);
                    nullable = true;
                } else if (schema.getTypes().size() == 2
                        && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
                    actualSchema = schema.getTypes().get(0);
                    nullable = true;
                } else if (schema.getTypes().size() == 1) {
                    actualSchema = schema.getTypes().get(0);
                    nullable = false;
                } else {
                    // use Kryo for serialization
                    return new AtomicDataType(
                            new TypeInformationRawType<>(false, Types.GENERIC(Object.class)));
                }
                DataType converted = convertToDataType(actualSchema, legacyMapping);
                return nullable ? converted.nullable() : converted;
            case FIXED:
                // logical decimal type
                if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
                    final LogicalTypes.Decimal decimalType =
                            (LogicalTypes.Decimal) schema.getLogicalType();
                    return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
                            .notNull();
                }
                // convert fixed size binary data to primitive byte arrays
                return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
            case STRING:
                // convert Avro's Utf8/CharSequence to String
                return DataTypes.STRING().notNull();
            case BYTES:
                // logical decimal type
                if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
                    final LogicalTypes.Decimal decimalType =
                            (LogicalTypes.Decimal) schema.getLogicalType();
                    return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale())
                            .notNull();
                }
                return DataTypes.BYTES().notNull();
            case INT:
                // logical date and time type
                final org.apache.avro.LogicalType logicalType = schema.getLogicalType();
                if (logicalType == LogicalTypes.date()) {
                    return DataTypes.DATE().notNull();
                } else if (logicalType == LogicalTypes.timeMillis()) {
                    return DataTypes.TIME(3).notNull();
                }
                return DataTypes.INT().notNull();
            case LONG:
                if (legacyMapping) {
                    // Avro logical timestamp types to Flink SQL timestamp types
                    if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
                        return DataTypes.TIMESTAMP(3).notNull();
                    } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
                        return DataTypes.TIMESTAMP(6).notNull();
                    } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
                        return DataTypes.TIME(3).notNull();
                    } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
                        return DataTypes.TIME(6).notNull();
                    }
                } else {
                    // Avro logical timestamp types to Flink SQL timestamp types
                    if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
                        return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
                    } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
                        return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
                    } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
                        return DataTypes.TIME(3).notNull();
                    } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
                        return DataTypes.TIME(6).notNull();
                    } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) {
                        return DataTypes.TIMESTAMP(3).notNull();
                    } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) {
                        return DataTypes.TIMESTAMP(6).notNull();
                    }
                }

                return DataTypes.BIGINT().notNull();
            case FLOAT:
                return DataTypes.FLOAT().notNull();
            case DOUBLE:
                return DataTypes.DOUBLE().notNull();
            case BOOLEAN:
                return DataTypes.BOOLEAN().notNull();
            case NULL:
                return DataTypes.NULL();
        }
        throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'.");
    }