in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/KeyExtractor.java [112:129]
private static FieldFormatter toFormatter(int index, LogicalType type) {
switch (type.getTypeRoot()) {
case DATE:
return (row) -> LocalDate.ofEpochDay(row.getInt(index)).toString();
case TIME_WITHOUT_TIME_ZONE:
return (row) ->
LocalTime.ofNanoOfDay((long) row.getInt(index) * 1_000_000L).toString();
case INTERVAL_YEAR_MONTH:
return (row) -> Period.ofDays(row.getInt(index)).toString();
case INTERVAL_DAY_TIME:
return (row) -> Duration.ofMillis(row.getLong(index)).toString();
case DISTINCT_TYPE:
return toFormatter(index, ((DistinctType) type).getSourceType());
default:
RowData.FieldGetter fieldGetter = RowData.createFieldGetter(type, index);
return (row) -> fieldGetter.getFieldOrNull(row).toString();
}
}