in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java [211:243]
public synchronized void flush() throws IOException {
checkFlushException();
for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
try {
attemptFlush();
batchCount = 0;
break;
} catch (SQLException e) {
LOG.error("JDBC executeBatch error, retry times = {}", i, e);
if (i >= executionOptions.getMaxRetries()) {
throw new IOException(e);
}
try {
if (!connectionProvider.isConnectionValid()) {
updateExecutor(true);
}
} catch (Exception exception) {
LOG.error(
"JDBC connection is not valid, and reestablish connection failed.",
exception);
throw new IOException("Reestablish JDBC connection failed", exception);
}
try {
Thread.sleep(1000 * i);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IOException(
"unable to flush; interrupted while doing another attempt", e);
}
}
}
}