in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSink.java [105:117]
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
if (kind != RowKind.UPDATE_BEFORE) {
builder.addContainedKind(kind);
}
}
if (isDynamicIndexWithSystemTime && !requestedMode.containsOnly(RowKind.INSERT)) {
throw new ValidationException(
"Dynamic indexing based on system time only works on append only stream.");
}
return builder.build();
}