in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/sink/MapElasticsearchEmitter.java [65:80]
public void emit(Map<String, Object> doc, SinkWriter.Context context, RequestIndexer indexer) {
if (idFieldName != null) {
final UpdateRequest updateRequest =
new UpdateRequest(
indexProvider.apply(doc),
documentType,
doc.get(idFieldName).toString())
.doc(doc)
.upsert(doc);
indexer.add(updateRequest);
} else {
final IndexRequest indexRequest =
new IndexRequest(indexProvider.apply(doc), documentType).source(doc);
indexer.add(indexRequest);
}
}