private void flush()

in pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java [129:162]


    private void flush() {
        final List<Document> docsToInsert = new ArrayList<>();
        final List<Record<byte[]>> recordsToInsert;

        synchronized (this) {
            if (incomingList.isEmpty()) {
                return;
            }

            recordsToInsert = incomingList;
            incomingList = Lists.newArrayList();
        }

        final Iterator<Record<byte[]>> iter = recordsToInsert.iterator();

        while (iter.hasNext()) {
            final Record<byte[]> record = iter.next();

            try {
                final byte[] docAsBytes = record.getValue();
                final Document doc = Document.parse(new String(docAsBytes, StandardCharsets.UTF_8));
                docsToInsert.add(doc);
            }
            catch (JsonParseException | BSONException e) {
                log.error("Bad message", e);
                record.fail();
                iter.remove();
            }
        }

        if (docsToInsert.size() > 0) {
            collection.insertMany(docsToInsert).subscribe(new DocsToInsertSubscriber(docsToInsert,recordsToInsert));
        }
    }