in pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java [129:162]
private void flush() {
final List<Document> docsToInsert = new ArrayList<>();
final List<Record<byte[]>> recordsToInsert;
synchronized (this) {
if (incomingList.isEmpty()) {
return;
}
recordsToInsert = incomingList;
incomingList = Lists.newArrayList();
}
final Iterator<Record<byte[]>> iter = recordsToInsert.iterator();
while (iter.hasNext()) {
final Record<byte[]> record = iter.next();
try {
final byte[] docAsBytes = record.getValue();
final Document doc = Document.parse(new String(docAsBytes, StandardCharsets.UTF_8));
docsToInsert.add(doc);
}
catch (JsonParseException | BSONException e) {
log.error("Bad message", e);
record.fail();
iter.remove();
}
}
if (docsToInsert.size() > 0) {
collection.insertMany(docsToInsert).subscribe(new DocsToInsertSubscriber(docsToInsert,recordsToInsert));
}
}