in flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java [118:130]
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
ChangelogMode.Builder builder = ChangelogMode.newBuilder();
for (RowKind kind : requestedMode.getContainedKinds()) {
if (kind != RowKind.UPDATE_BEFORE) {
builder.addContainedKind(kind);
}
}
if (isDynamicIndexWithSystemTime && !requestedMode.containsOnly(RowKind.INSERT)) {
throw new ValidationException(
"Dynamic indexing based on system time only works on append only stream.");
}
return builder.build();
}