private void closeCurrentNormalSessionWithRetry()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [318:350]


  private void closeCurrentNormalSessionWithRetry(int retryLimit) throws IOException {
    String threadName = String.valueOf(Thread.currentThread().getId());
    LOGGER.debug("Thread({}) Enter closeCurrentSessionWithRetry!", threadName);
    if (session == null) {
      return;
    }

    totalBytesByClosedSessions += ((TunnelBufferedWriter) writer).getTotalBytes();
    writer.close();
    LOGGER.debug("Thread({}) writer.close() successfully!", threadName);

    while (true) {
      try {
        session.commit();
        LOGGER.debug("Thread({}) session.commit() successfully!", threadName);
        break;
      } catch (TunnelException e) {
        // TODO: random backoff
        retryLimit -= 1;
        LOGGER.debug(String.format("retryLimit: %d", retryLimit));
        if (retryLimit >= 0) {
          try {
            Thread.sleep(DEFAULT_RETRY_INTERVAL_SECONDS * 1000);
          } catch (InterruptedException ex) {
            LOGGER.warn("Retry sleep is interrupted, retry immediately", ex);
          }
          LOGGER.warn("Failed to commit upload session, retrying", e);
        } else {
          throw new IOException(e);
        }
      }
    }
  }