in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/sink/writer/MongoWriter.java [88:146]
public MongoWriter(
MongoConnectionOptions connectionOptions,
MongoWriteOptions writeOptions,
boolean flushOnCheckpoint,
Sink.InitContext initContext,
MongoSerializationSchema<IN> serializationSchema) {
this.connectionOptions = checkNotNull(connectionOptions);
this.writeOptions = checkNotNull(writeOptions);
this.serializationSchema = checkNotNull(serializationSchema);
this.flushOnCheckpoint = flushOnCheckpoint;
this.batchIntervalMs = writeOptions.getBatchIntervalMs();
this.batchSize = writeOptions.getBatchSize();
checkNotNull(initContext);
this.mailboxExecutor = checkNotNull(initContext.getMailboxExecutor());
SinkWriterMetricGroup metricGroup = checkNotNull(initContext.metricGroup());
metricGroup.setCurrentSendTimeGauge(() -> ackTime - lastSendTime);
this.numRecordsOut = metricGroup.getNumRecordsSendCounter();
this.collector = new ListCollector<>(this.bulkRequests);
// Initialize the serialization schema.
this.sinkContext = new DefaultMongoSinkContext(initContext, writeOptions);
try {
SerializationSchema.InitializationContext initializationContext =
initContext.asSerializationSchemaInitializationContext();
serializationSchema.open(initializationContext, sinkContext, writeOptions);
} catch (Exception e) {
throw new FlinkRuntimeException("Failed to open the MongoEmitter", e);
}
// Initialize the mongo client.
this.mongoClient = MongoClients.create(connectionOptions.getUri());
boolean flushOnlyOnCheckpoint = batchIntervalMs == -1 && batchSize == -1;
if (!flushOnlyOnCheckpoint && batchIntervalMs > 0) {
this.scheduler =
Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("mongo-writer"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (MongoWriter.this) {
if (!closed && isOverMaxBatchIntervalLimit()) {
try {
doBulkWrite();
} catch (Exception e) {
flushException = e;
}
}
}
},
batchIntervalMs,
batchIntervalMs,
TimeUnit.MILLISECONDS);
}
}