in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [271:303]
private void flushStreamPackWithRetry(int retryLimit) throws IOException {
if (streamPack == null) {
// init condition
return;
}
int retried = 0;
while (true) {
try {
streamPack.flush();
break;
} catch (IOException ex) {
LOGGER.warn(
"Failed to flush streaming pack, retrying after " + DEFAULT_RETRY_INTERVAL_SECONDS
+ "s", ex);
try {
Thread.sleep(DEFAULT_RETRY_INTERVAL_SECONDS * 1000);
} catch (InterruptedException e) {
LOGGER.warn("Retry sleep is interrupted, retry immediately", e);
}
retried++;
if (retried >= retryLimit) {
try {
streamPack = recreateRecordPack();
} catch (TunnelException e) {
LOGGER.error("Failed to flush streaming pack after specified retries.", ex);
throw new IOException("Failed to recreate stream pack on failed flushes.", e);
}
throw ex;
}
}
}
minOffset = null; // flush good
}