in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/sink/OutputFormatSinkFunction.java [84:109]
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
long startSyncing = System.currentTimeMillis();
// Retry until successful
while (true) {
try {
((Syncable) outputFormat).sync();
((Syncable) outputFormat).waitFinish();
break;
} catch (IOException e) {
LOG.error("Sync output format failed", e);
try {
Thread.sleep(RETRY_INTERVAL);
} catch (InterruptedException e1) {
//throw new RuntimeException(e1);
}
}
long retryingTimeCost = System.currentTimeMillis() - startSyncing;
if (retryingTimeCost > retryTimeout) {
throw new IOException(
String.format(
"Retry time exceed timeout Error: %s, %s",
retryingTimeCost, retryTimeout));
}
}
}