in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java [138:166]
public void open(int taskNumber, int numTasks) throws IOException {
try {
connectionProvider.getOrEstablishConnection();
} catch (Exception e) {
throw new IOException("unable to open JDBC writer", e);
}
jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
this.scheduler =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
this.scheduledFuture =
this.scheduler.scheduleWithFixedDelay(
() -> {
synchronized (JdbcOutputFormat.this) {
if (!closed) {
try {
flush();
} catch (Exception e) {
flushException = e;
}
}
}
},
executionOptions.getBatchIntervalMs(),
executionOptions.getBatchIntervalMs(),
TimeUnit.MILLISECONDS);
}
}