in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java [93:107]
private void processUpsert(RowData row, RequestIndexer indexer) {
final byte[] document = serializationSchema.serialize(row);
final String key = createKey.apply(row);
if (key != null) {
final UpdateRequest updateRequest =
requestFactory.createUpdateRequest(
indexGenerator.generate(row), docType, key, contentType, document);
indexer.add(updateRequest);
} else {
final IndexRequest indexRequest =
requestFactory.createIndexRequest(
indexGenerator.generate(row), docType, key, contentType, document);
indexer.add(indexRequest);
}
}