public Map preCommit()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [302:355]


  public Map<TopicPartition, OffsetAndMetadata> preCommit(
      Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    if (multiWriteMode) {
      flushRecordBuffer();
      if (writerTasks.isEmpty()) {
        LOGGER.info("no data written to tunnel!");
      }
      while (!writerTasks.isEmpty()) {
        try {
          boolean result = writerTasks.poll().get();
          if (!result) {
            needSyncCommit = true;
          }
        } catch (InterruptedException | ExecutionException e) {
          LOGGER.error("unrecoverable error happens: {} ,the task will exit! ", e.getMessage());
          throw new RuntimeException(e);
        }
      }
      if (LOGGER.isDebugEnabled()) {
        totalBytesSentByClosedWriters = 0;
        sinkStatus.forEach(
            (pt, cxt) -> totalBytesSentByClosedWriters += cxt.getTotalBytesSentByWriter());
        LOGGER.debug("Total bytes written by multi-writer :{}", totalBytesSentByClosedWriters);
      }
      // 本地的offset 和 currentOffsets 进行比较; 告诉其哪些partition 哪些地方已经消费了可以提交
      Set<TopicPartition> errorPartitions = new HashSet<>();
      for (Entry<TopicPartition, OffsetAndMetadata> entry : currentOffsets.entrySet()) {
        TopicPartition partition = entry.getKey();
        OffsetAndMetadata offsetAndMetadata = entry.getValue();
        SinkStatusContext curStatus = sinkStatus.get(partition);
        if (curStatus != null) {
          curStatus.mergeOffset();
          // 需要在下次put操作后立刻提交
          needSyncCommit = !curStatus.intervalOffsetEmpty();
          long curOffset = curStatus.getConsumedOffsets();
          if (curOffset != -1) {
            currentOffsets.put(partition,
                               new OffsetAndMetadata(curOffset + 1, offsetAndMetadata.metadata()));
            LOGGER.info("partiton:{} consumed offset: {}", partition, curOffset);
          } else {
            // 这里存在一种情况,空的分区可能会被分配配task,没有数据,因此,innerOffsets需要在有数据进来的时候,才创建分区类型
            errorPartitions.add(partition);
            LOGGER.warn("something error in consumedOffset partition: {}", partition);
          }
        } else {
          errorPartitions.add(partition);
          LOGGER.warn("no partition exist in innerOffsets");
        }
      }
      errorPartitions.forEach(currentOffsets::remove);
    }
    flush(currentOffsets);
    return currentOffsets;
  }