in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [318:350]
private void closeCurrentNormalSessionWithRetry(int retryLimit) throws IOException {
String threadName = String.valueOf(Thread.currentThread().getId());
LOGGER.debug("Thread({}) Enter closeCurrentSessionWithRetry!", threadName);
if (session == null) {
return;
}
totalBytesByClosedSessions += ((TunnelBufferedWriter) writer).getTotalBytes();
writer.close();
LOGGER.debug("Thread({}) writer.close() successfully!", threadName);
while (true) {
try {
session.commit();
LOGGER.debug("Thread({}) session.commit() successfully!", threadName);
break;
} catch (TunnelException e) {
// TODO: random backoff
retryLimit -= 1;
LOGGER.debug(String.format("retryLimit: %d", retryLimit));
if (retryLimit >= 0) {
try {
Thread.sleep(DEFAULT_RETRY_INTERVAL_SECONDS * 1000);
} catch (InterruptedException ex) {
LOGGER.warn("Retry sleep is interrupted, retry immediately", ex);
}
LOGGER.warn("Failed to commit upload session, retrying", e);
} else {
throw new IOException(e);
}
}
}
}