public static RowDataToAvroConverter createConverter()

in seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/convert/RowDataToAvroConverters.java [68:220]


    public static RowDataToAvroConverter createConverter(SeaTunnelDataType<?> dataType) {
        final RowDataToAvroConverter converter;
        switch (dataType.getSqlType()) {
            case TINYINT:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ((Byte) object).intValue();
                            }
                        };
                break;
            case SMALLINT:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ((Short) object).intValue();
                            }
                        };
                break;
            case BOOLEAN: // boolean
            case INT: // int
            case BIGINT: // long
            case FLOAT: // float
            case DOUBLE: // double
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return object;
                            }
                        };
                break;
            case TIME: // int
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
                            }
                        };
                break;
            case DATE: // int
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ((int) ((LocalDate) object).toEpochDay());
                            }
                        };
                break;
            case STRING:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return new Utf8(object.toString());
                            }
                        };
                break;
            case BYTES:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ByteBuffer.wrap((byte[]) object);
                            }
                        };
                break;
            case TIMESTAMP:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                return ((LocalDateTime) object)
                                        .toInstant(java.time.ZoneOffset.UTC)
                                        .toEpochMilli();
                            }
                        };
                break;
            case DECIMAL:
                converter =
                        new RowDataToAvroConverter() {
                            private static final long serialVersionUID = 1L;

                            @Override
                            public Object convert(Schema schema, Object object) {
                                BigDecimal javaDecimal = (BigDecimal) object;
                                return DECIMAL_CONVERSION.toFixed(
                                        javaDecimal, schema, schema.getLogicalType());
                            }
                        };
                break;
            case ARRAY:
                converter = createArrayConverter((ArrayType<?, ?>) dataType);
                break;
            case ROW:
                converter = createRowConverter((SeaTunnelRowType) dataType);
                break;
            case MAP:
                converter = createMapConverter(dataType);
                break;
            default:
                throw new UnsupportedOperationException("Unsupported type: " + dataType);
        }

        // wrap into nullable converter
        return new RowDataToAvroConverter() {
            private static final long serialVersionUID = 1L;

            @Override
            public Object convert(Schema schema, Object object) {
                if (object == null) {
                    return null;
                }

                // get actual schema if it is a nullable schema
                Schema actualSchema;
                if (schema.getType() == Schema.Type.UNION) {
                    List<Schema> types = schema.getTypes();
                    int size = types.size();
                    if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
                        actualSchema = types.get(0);
                    } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
                        actualSchema = types.get(1);
                    } else {
                        throw new IllegalArgumentException(
                                "The Avro schema is not a nullable type: " + schema);
                    }
                } else {
                    actualSchema = schema;
                }
                return converter.convert(actualSchema, object);
            }
        };
    }