in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [302:355]
public Map<TopicPartition, OffsetAndMetadata> preCommit(
Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
if (multiWriteMode) {
flushRecordBuffer();
if (writerTasks.isEmpty()) {
LOGGER.info("no data written to tunnel!");
}
while (!writerTasks.isEmpty()) {
try {
boolean result = writerTasks.poll().get();
if (!result) {
needSyncCommit = true;
}
} catch (InterruptedException | ExecutionException e) {
LOGGER.error("unrecoverable error happens: {} ,the task will exit! ", e.getMessage());
throw new RuntimeException(e);
}
}
if (LOGGER.isDebugEnabled()) {
totalBytesSentByClosedWriters = 0;
sinkStatus.forEach(
(pt, cxt) -> totalBytesSentByClosedWriters += cxt.getTotalBytesSentByWriter());
LOGGER.debug("Total bytes written by multi-writer :{}", totalBytesSentByClosedWriters);
}
// 本地的offset 和 currentOffsets 进行比较; 告诉其哪些partition 哪些地方已经消费了可以提交
Set<TopicPartition> errorPartitions = new HashSet<>();
for (Entry<TopicPartition, OffsetAndMetadata> entry : currentOffsets.entrySet()) {
TopicPartition partition = entry.getKey();
OffsetAndMetadata offsetAndMetadata = entry.getValue();
SinkStatusContext curStatus = sinkStatus.get(partition);
if (curStatus != null) {
curStatus.mergeOffset();
// 需要在下次put操作后立刻提交
needSyncCommit = !curStatus.intervalOffsetEmpty();
long curOffset = curStatus.getConsumedOffsets();
if (curOffset != -1) {
currentOffsets.put(partition,
new OffsetAndMetadata(curOffset + 1, offsetAndMetadata.metadata()));
LOGGER.info("partiton:{} consumed offset: {}", partition, curOffset);
} else {
// 这里存在一种情况,空的分区可能会被分配配task,没有数据,因此,innerOffsets需要在有数据进来的时候,才创建分区类型
errorPartitions.add(partition);
LOGGER.warn("something error in consumedOffset partition: {}", partition);
}
} else {
errorPartitions.add(partition);
LOGGER.warn("no partition exist in innerOffsets");
}
}
errorPartitions.forEach(currentOffsets::remove);
}
flush(currentOffsets);
return currentOffsets;
}