public Boolean call()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [524:562]


  public Boolean call() throws RuntimeException {
    long time = System.currentTimeMillis() / 1000;
    long start = -1;
    long end = -1;
    processedRecordsEachEcho = 0;
    boolean ok = true;
    try {
      for (SinkRecord record : recordBuffer) {
        write(record, time);
        if (start == -1) {
          start = record.kafkaOffset();
        }
        end = Math.max(end, record.kafkaOffset());
        processedRecordsEachEcho++;
      }
    } catch (IOException e) {
      // tunnel 的波动引起 , 会不断重试
      LOGGER.warn("something error in tunnel write,Please check tunnel environment! {}",
                  e.getMessage());
      ok = false;
    } catch (RuntimeException e) {
      // 数据内部错误,且用户选择不跳过,直接抛给上层框架
      LOGGER.error("something error in MaxComputerSinkWriter : " + e.getMessage());
      throw new RuntimeException(e);
    }
    try {
      flush();
      close();
      if (start != -1) {
        sinkStatusContext.addOffsetInterval(start, end);
        sinkStatusContext.addTotalBytesSentByWriter(getTotalBytes());
      }
    } catch (IOException e) {
      LOGGER.warn("something error in tunnel close,Please check tunnel environment! {}",
                  e.getMessage());
      ok = false;
    }
    return ok;
  }