private void commitOffsets()

in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java [249:314]


    private void commitOffsets(long now, boolean closing, Set<MessageQueue> messageQueues) {
        log.trace("Start commit offsets {}", messageQueues);

        Map<MessageQueue, Long> offsetsToCommit = currentOffsets.entrySet()
                .stream()
                .filter(e -> messageQueues.contains(e.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        if (offsetsToCommit.isEmpty()) {
            return;
        }

        committing = true;
        commitSeqno += 1;
        commitStarted = now;

        Map<MessageQueue, Long> lastCommittedQueuesOffsets = this.lastCommittedOffsets.entrySet()
                .stream()
                .filter(e -> offsetsToCommit.containsKey(e.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        Map<MessageQueue, Long> taskProvidedOffsets = new ConcurrentHashMap<>();
        Map<RecordPartition, RecordOffset> taskProvidedRecordOffsets = new ConcurrentHashMap<>();
        try {
            log.info(" Call task.preCommit reset offset : {}", offsetsToCommit);
            Map<RecordPartition, RecordOffset> recordOffsetsToCommit = new ConcurrentHashMap<>();
            for (Map.Entry<MessageQueue, Long> messageQueueOffset : offsetsToCommit.entrySet()) {
                RecordPartition recordPartition = ConnectUtil.convertToRecordPartition(messageQueueOffset.getKey());
                RecordOffset recordOffset = ConnectUtil.convertToRecordOffset(messageQueueOffset.getValue());
                recordOffsetsToCommit.put(recordPartition, recordOffset);
            }

            // pre commit
            taskProvidedRecordOffsets = sinkTask.preCommit(recordOffsetsToCommit);
            // task provided commit offset
            for (Map.Entry<RecordPartition, RecordOffset> entry : taskProvidedRecordOffsets.entrySet()) {
                taskProvidedOffsets.put(ConnectUtil.convertToMessageQueue(entry.getKey()), ConnectUtil.convertToOffset(entry.getValue()));
            }

        } catch (Throwable t) {
            if (closing) {
                log.warn(" {} Offset commit failed {}", this);
            } else {
                log.error("{} Offset commit failed, reset to last committed offsets", this, t);
                for (Map.Entry<MessageQueue, Long> entry : lastCommittedQueuesOffsets.entrySet()) {
                    try {
                        consumer.seek(entry.getKey(), entry.getValue());
                    } catch (MQClientException e) {
                    }
                }
                currentOffsets.putAll(lastCommittedQueuesOffsets);
            }
            onCommitCompleted(t, commitSeqno, null);
            return;
        } finally {
            if (closing) {
                log.trace("{} Closing the task before committing the offsets: {}", this, offsetsToCommit);
            }
        }
        if (taskProvidedOffsets.isEmpty()) {
            log.debug("{} Skipping offset commit, task opted-out by returning no offsets from preCommit", this);
            onCommitCompleted(null, commitSeqno, null);
            return;
        }
        compareAndCommit(offsetsToCommit, lastCommittedQueuesOffsets, taskProvidedOffsets);
    }