in fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/PojoToRowConverter.java [194:304]
private FieldConverter createConverterForField(DataType fieldType, Field field) {
field.setAccessible(true);
switch (fieldType.getTypeRoot()) {
case BOOLEAN:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
case FLOAT:
case DOUBLE:
case BINARY:
case BYTES:
return field::get;
case CHAR:
case STRING:
return obj -> {
Object value = field.get(obj);
return value == null ? null : BinaryString.fromString(value.toString());
};
case DECIMAL:
return obj -> {
Object value = field.get(obj);
if (value == null) {
return null;
}
if (value instanceof BigDecimal) {
DecimalType decimalType = (DecimalType) fieldType;
return Decimal.fromBigDecimal(
(BigDecimal) value,
decimalType.getPrecision(),
decimalType.getScale());
} else {
LOG.warn(
"Field {} is not a BigDecimal. Cannot convert to DecimalData.",
field.getName());
return null;
}
};
case DATE:
return obj -> {
Object value = field.get(obj);
if (value == null) {
return null;
}
if (value instanceof LocalDate) {
return (int) ((LocalDate) value).toEpochDay();
} else {
LOG.warn(
"Field {} is not a LocalDate. Cannot convert to int days.",
field.getName());
return null;
}
};
case TIME_WITHOUT_TIME_ZONE:
return obj -> {
Object value = field.get(obj);
if (value == null) {
return null;
}
if (value instanceof LocalTime) {
LocalTime localTime = (LocalTime) value;
return (int) (localTime.toNanoOfDay() / 1_000_000);
} else {
LOG.warn(
"Field {} is not a LocalTime. Cannot convert to int millis.",
field.getName());
return null;
}
};
case TIMESTAMP_WITHOUT_TIME_ZONE:
return obj -> {
Object value = field.get(obj);
if (value == null) {
return null;
}
if (value instanceof LocalDateTime) {
return TimestampNtz.fromLocalDateTime((LocalDateTime) value);
} else {
LOG.warn(
"Field {} is not a LocalDateTime. Cannot convert to TimestampData.",
field.getName());
return null;
}
};
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return obj -> {
Object value = field.get(obj);
if (value == null) {
return null;
}
if (value instanceof Instant) {
return TimestampLtz.fromInstant((Instant) value);
} else if (value instanceof OffsetDateTime) {
OffsetDateTime offsetDateTime = (OffsetDateTime) value;
return TimestampLtz.fromInstant(offsetDateTime.toInstant());
} else {
LOG.warn(
"Field {} is not an Instant or OffsetDateTime. Cannot convert to TimestampData.",
field.getName());
return null;
}
};
default:
LOG.warn(
"Unsupported type {} for field {}. Will use null for it.",
fieldType.getTypeRoot(),
field.getName());
return obj -> null;
}
}