in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java [249:314]
private void commitOffsets(long now, boolean closing, Set<MessageQueue> messageQueues) {
log.trace("Start commit offsets {}", messageQueues);
Map<MessageQueue, Long> offsetsToCommit = currentOffsets.entrySet()
.stream()
.filter(e -> messageQueues.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (offsetsToCommit.isEmpty()) {
return;
}
committing = true;
commitSeqno += 1;
commitStarted = now;
Map<MessageQueue, Long> lastCommittedQueuesOffsets = this.lastCommittedOffsets.entrySet()
.stream()
.filter(e -> offsetsToCommit.containsKey(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<MessageQueue, Long> taskProvidedOffsets = new ConcurrentHashMap<>();
Map<RecordPartition, RecordOffset> taskProvidedRecordOffsets = new ConcurrentHashMap<>();
try {
log.info(" Call task.preCommit reset offset : {}", offsetsToCommit);
Map<RecordPartition, RecordOffset> recordOffsetsToCommit = new ConcurrentHashMap<>();
for (Map.Entry<MessageQueue, Long> messageQueueOffset : offsetsToCommit.entrySet()) {
RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(messageQueueOffset.getKey());
RecordOffset recordOffset = ConnectUtil.convertToRecordOffset(messageQueueOffset.getValue());
recordOffsetsToCommit.put(recordPartition, recordOffset);
}
// pre commit
taskProvidedRecordOffsets = sinkTask.preCommit(recordOffsetsToCommit);
// task provided commit offset
for (Map.Entry<RecordPartition, RecordOffset> entry : taskProvidedRecordOffsets.entrySet()) {
taskProvidedOffsets.put(ConnectUtil.convertToMessageQueue(entry.getKey()), ConnectUtil.convertToOffset(entry.getValue()));
}
} catch (Throwable t) {
if (closing) {
log.warn(" {} Offset commit failed {}", this);
} else {
log.error("{} Offset commit failed, reset to last committed offsets", this, t);
for (Map.Entry<MessageQueue, Long> entry : lastCommittedQueuesOffsets.entrySet()) {
try {
consumer.seek(entry.getKey(), entry.getValue());
} catch (MQClientException e) {
}
}
currentOffsets.putAll(lastCommittedQueuesOffsets);
}
onCommitCompleted(t, commitSeqno, null);
return;
} finally {
if (closing) {
log.trace("{} Closing the task before committing the offsets: {}", this, offsetsToCommit);
}
}
if (taskProvidedOffsets.isEmpty()) {
log.debug("{} Skipping offset commit, task opted-out by returning no offsets from preCommit", this);
onCommitCompleted(null, commitSeqno, null);
return;
}
compareAndCommit(offsetsToCommit, lastCommittedQueuesOffsets, taskProvidedOffsets);
}