public static RowDataToAvroConverter createConverter()

in flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RowDataToAvroConverters.java [78:254]


    public static RowDataToAvroConverter createConverter(
            LogicalType type, boolean legacyTimestampMapping) {
        final RowDataToAvroConverter converter;
        switch (type.getTypeRoot()) {
            case NULL:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return null;
                            }
                        };
                break;
            case TINYINT:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ((Byte) object).intValue();
                            }
                        };
                break;
            case SMALLINT:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ((Short) object).intValue();
                            }
                        };
                break;
            case BOOLEAN: // boolean
            case INTEGER: // int
            case INTERVAL_YEAR_MONTH: // long
            case BIGINT: // long
            case INTERVAL_DAY_TIME: // long
            case FLOAT: // float
            case DOUBLE: // double
            case TIME_WITHOUT_TIME_ZONE: // int
            case DATE: // int
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return object;
                            }
                        };
                break;
            case CHAR:
            case VARCHAR:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return new Utf8(object.toString());
                            }
                        };
                break;
            case BINARY:
            case VARBINARY:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ByteBuffer.wrap((byte[]) object);
                            }
                        };
                break;
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                if (legacyTimestampMapping) {
                    converter =
                            new RowDataToAvroConverter() {
                                private static final long serialVersionUID = 1L;

                                @Override
                                public Object convert(Schema schema, Object object) {
                                    return ((TimestampData) object).toInstant().toEpochMilli();
                                }
                            };
                } else {
                    converter =
                            new RowDataToAvroConverter() {
                                private static final long serialVersionUID = 1L;

                                @Override
                                public Object convert(Schema schema, Object object) {
                                    return ((TimestampData) object)
                                            .toLocalDateTime()
                                            .toInstant(ZoneOffset.UTC)
                                            .toEpochMilli();
                                }
                            };
                }
                break;
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                if (legacyTimestampMapping) {
                    throw new UnsupportedOperationException("Unsupported type: " + type);
                } else {
                    converter =
                            new RowDataToAvroConverter() {
                                private static final long serialVersionUID = 1L;

                                @Override
                                public Object convert(Schema schema, Object object) {
                                    return ((TimestampData) object).toInstant().toEpochMilli();
                                }
                            };
                }
                break;
            case DECIMAL:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes());
                            }
                        };
                break;
            case ARRAY:
                converter = createArrayConverter((ArrayType) type, legacyTimestampMapping);
                break;
            case ROW:
                converter = createRowConverter((RowType) type, legacyTimestampMapping);
                break;
            case MAP:
            case MULTISET:
                converter = createMapConverter(type, legacyTimestampMapping);
                break;
            case RAW:
            default:
                throw new UnsupportedOperationException("Unsupported type: " + type);
        }

        // wrap into nullable converter
        return new RowDataToAvroConverter() {
            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Schema schema, Object object) {
                if (object == null) {
                    return null;
                }

                // get actual schema if it is a nullable schema
                Schema actualSchema;
                if (schema.getType() == Schema.Type.UNION) {
                    List<Schema> types = schema.getTypes();
                    int size = types.size();
                    if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
                        actualSchema = types.get(0);
                    } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
                        actualSchema = types.get(1);
                    } else {
                        throw new IllegalArgumentException(
                                "The Avro schema is not a nullable type: " + schema.toString());
                    }
                } else {
                    actualSchema = schema;
                }
                return converter.convert(actualSchema, object);
            }
        };
    }