in flink-connector-opensearch/src/main/java/org/apache/flink/connector/opensearch/table/IndexGeneratorFactory.java [165:198]
private static DynamicFormatter createFormatFunction(
LogicalType indexFieldType, LogicalTypeRoot indexFieldLogicalTypeRoot) {
switch (indexFieldLogicalTypeRoot) {
case DATE:
return (value, dateTimeFormatter) -> {
Integer indexField = (Integer) value;
return LocalDate.ofEpochDay(indexField).format(dateTimeFormatter);
};
case TIME_WITHOUT_TIME_ZONE:
return (value, dateTimeFormatter) -> {
Integer indexField = (Integer) value;
return LocalTime.ofNanoOfDay(indexField * 1_000_000L).format(dateTimeFormatter);
};
case TIMESTAMP_WITHOUT_TIME_ZONE:
return (value, dateTimeFormatter) -> {
TimestampData indexField = (TimestampData) value;
return indexField.toLocalDateTime().format(dateTimeFormatter);
};
case TIMESTAMP_WITH_TIME_ZONE:
throw new UnsupportedOperationException(
"TIMESTAMP_WITH_TIME_ZONE is not supported yet");
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return (value, dateTimeFormatter) -> {
TimestampData indexField = (TimestampData) value;
return indexField.toInstant().atZone(ZoneOffset.UTC).format(dateTimeFormatter);
};
default:
throw new TableException(
String.format(
"Unsupported type '%s' found in Opensearch dynamic index field, "
+ "time-related pattern only support types are: DATE,TIME,TIMESTAMP.",
indexFieldType));
}
}