private FieldConverter createConverterForField()

in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/PojoToRowConverter.java [194:304]


    private FieldConverter createConverterForField(DataType fieldType, Field field) {
        field.setAccessible(true);

        switch (fieldType.getTypeRoot()) {
            case BOOLEAN:
            case TINYINT:
            case SMALLINT:
            case INTEGER:
            case BIGINT:
            case FLOAT:
            case DOUBLE:
            case BINARY:
            case BYTES:
                return field::get;
            case CHAR:
            case STRING:
                return obj -> {
                    Object value = field.get(obj);
                    return value == null ? null : BinaryString.fromString(value.toString());
                };
            case DECIMAL:
                return obj -> {
                    Object value = field.get(obj);
                    if (value == null) {
                        return null;
                    }
                    if (value instanceof BigDecimal) {
                        DecimalType decimalType = (DecimalType) fieldType;
                        return Decimal.fromBigDecimal(
                                (BigDecimal) value,
                                decimalType.getPrecision(),
                                decimalType.getScale());
                    } else {
                        LOG.warn(
                                "Field {} is not a BigDecimal. Cannot convert to DecimalData.",
                                field.getName());
                        return null;
                    }
                };
            case DATE:
                return obj -> {
                    Object value = field.get(obj);
                    if (value == null) {
                        return null;
                    }
                    if (value instanceof LocalDate) {
                        return (int) ((LocalDate) value).toEpochDay();
                    } else {
                        LOG.warn(
                                "Field {} is not a LocalDate. Cannot convert to int days.",
                                field.getName());
                        return null;
                    }
                };
            case TIME_WITHOUT_TIME_ZONE:
                return obj -> {
                    Object value = field.get(obj);
                    if (value == null) {
                        return null;
                    }
                    if (value instanceof LocalTime) {
                        LocalTime localTime = (LocalTime) value;
                        return (int) (localTime.toNanoOfDay() / 1_000_000);
                    } else {
                        LOG.warn(
                                "Field {} is not a LocalTime. Cannot convert to int millis.",
                                field.getName());
                        return null;
                    }
                };
            case TIMESTAMP_WITHOUT_TIME_ZONE:
                return obj -> {
                    Object value = field.get(obj);
                    if (value == null) {
                        return null;
                    }
                    if (value instanceof LocalDateTime) {
                        return TimestampNtz.fromLocalDateTime((LocalDateTime) value);
                    } else {
                        LOG.warn(
                                "Field {} is not a LocalDateTime. Cannot convert to TimestampData.",
                                field.getName());
                        return null;
                    }
                };
            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                return obj -> {
                    Object value = field.get(obj);
                    if (value == null) {
                        return null;
                    }
                    if (value instanceof Instant) {
                        return TimestampLtz.fromInstant((Instant) value);
                    } else if (value instanceof OffsetDateTime) {
                        OffsetDateTime offsetDateTime = (OffsetDateTime) value;
                        return TimestampLtz.fromInstant(offsetDateTime.toInstant());
                    } else {
                        LOG.warn(
                                "Field {} is not an Instant or OffsetDateTime. Cannot convert to TimestampData.",
                                field.getName());
                        return null;
                    }
                };
            default:
                LOG.warn(
                        "Unsupported type {} for field {}. Will use null for it.",
                        fieldType.getTypeRoot(),
                        field.getName());
                return obj -> null;
        }
    }