private void flushStreamPackWithRetry()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [271:303]


  private void flushStreamPackWithRetry(int retryLimit) throws IOException {
    if (streamPack == null) {
      // init condition
      return;
    }
    int retried = 0;
    while (true) {
      try {
        streamPack.flush();
        break;
      } catch (IOException ex) {
        LOGGER.warn(
            "Failed to flush streaming pack, retrying after " + DEFAULT_RETRY_INTERVAL_SECONDS
            + "s", ex);
        try {
          Thread.sleep(DEFAULT_RETRY_INTERVAL_SECONDS * 1000);
        } catch (InterruptedException e) {
          LOGGER.warn("Retry sleep is interrupted, retry immediately", e);
        }
        retried++;
        if (retried >= retryLimit) {
          try {
            streamPack = recreateRecordPack();
          } catch (TunnelException e) {
            LOGGER.error("Failed to flush streaming pack after specified retries.", ex);
            throw new IOException("Failed to recreate stream pack on failed flushes.", e);
          }
          throw ex;
        }
      }
    }
    minOffset = null; // flush good
  }