in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/RowElasticsearchEmitter.java [105:121]
private void processUpsert(RowData row, RequestIndexer indexer) {
final byte[] document = serializationSchema.serialize(row);
final String key = createKey.apply(row);
if (key != null) {
final UpdateRequest updateRequest =
new UpdateRequest(indexGenerator.generate(row), documentType, key)
.doc(document, contentType)
.upsert(document, contentType);
indexer.add(updateRequest);
} else {
final IndexRequest indexRequest =
new IndexRequest(indexGenerator.generate(row), documentType)
.id(key)
.source(document, contentType);
indexer.add(indexRequest);
}
}