in data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java [114:143]
public void doOutput(final Collection<Record<Object>> records) {
if (records.isEmpty()) {
return;
}
BulkRequest bulkRequest = bulkRequestSupplier.get();
for (final Record<Object> record : records) {
final String document = getDocument(record.getData());
final IndexRequest indexRequest = new IndexRequest().source(document, XContentType.JSON);
try {
final Map<String, Object> source = getMapFromJson(document);
final String docId = (String) source.get(documentIdField);
if (docId != null) {
indexRequest.id(docId);
}
final long estimatedBytesBeforeAdd = bulkRequest.estimatedSizeInBytes() + calcEstimatedSizeInBytes(indexRequest);
if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.numberOfActions() > 0) {
flushBatch(bulkRequest);
bulkRequest = bulkRequestSupplier.get();
}
bulkRequest.add(indexRequest);
} catch (final IOException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
// Flush the remaining requests
if (bulkRequest.numberOfActions() > 0) {
flushBatch(bulkRequest);
}
}