in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java [202:235]
void doBulkWrite() throws IOException {
if (bulkRequests.isEmpty()) {
// no records to write
return;
}
int maxRetries = writeOptions.getMaxRetries();
long retryIntervalMs = writeOptions.getRetryIntervalMs();
for (int i = 0; i <= maxRetries; i++) {
try {
lastSendTime = System.currentTimeMillis();
mongoClient
.getDatabase(connectionOptions.getDatabase())
.getCollection(connectionOptions.getCollection(), BsonDocument.class)
.bulkWrite(bulkRequests);
ackTime = System.currentTimeMillis();
bulkRequests.clear();
break;
} catch (MongoException e) {
LOG.debug("Bulk Write to MongoDB failed, retry times = {}", i, e);
if (i >= maxRetries) {
LOG.error("Bulk Write to MongoDB failed", e);
throw new IOException(e);
}
try {
Thread.sleep(retryIntervalMs * (i + 1));
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException(
"Unable to flush; interrupted while doing another attempt", e);
}
}
}
}