in flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java [175:238]
public DeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
case BOOLEAN:
case FLOAT:
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
return val -> val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return val -> DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val -> {
if (val instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime) val);
} else if (val instanceof Timestamp) {
return TimestampData.fromTimestamp((Timestamp) val);
} else {
throw new UnsupportedOperationException(
"timestamp type must be java.time.LocalDateTime or java.sql.Timestamp, the actual type is: "
+ val.getClass().getName());
}
};
case DATE:
return val -> {
if (val instanceof LocalDate) {
// doris source
return (int) ((LocalDate) val).toEpochDay();
} else if (val instanceof Date) {
// doris lookup
return (int) ((Date) val).toLocalDate().toEpochDay();
} else {
throw new UnsupportedOperationException(
"timestamp type must be java.time.LocalDate, the actual type is: "
+ val.getClass());
}
};
case CHAR:
case VARCHAR:
return val -> StringData.fromString(val.toString());
case TIME_WITHOUT_TIME_ZONE:
case BINARY:
case VARBINARY:
case ARRAY:
return val -> convertArrayData(((List<?>) val).toArray(), type);
case ROW:
return val -> convertRowData((Map<String, ?>) val, type);
case MAP:
return val -> convertMapData((Map) val, type);
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}