public Map preCommit()

in src/main/java/org/apache/doris/kafka/connector/DorisSinkTask.java [129:152]


    public Map<TopicPartition, OffsetAndMetadata> preCommit(
            Map<TopicPartition, OffsetAndMetadata> offsets) throws RetriableException {
        // return an empty map means that offset commitment is not desired
        if (sink == null || sink.getPartitionCount() == 0) {
            return new HashMap<>();
        }

        sink.commit(offsets);
        Map<TopicPartition, OffsetAndMetadata> committedOffsets = new HashMap<>();
        // it's ok to just log the error since commit can retry
        try {
            offsets.forEach(
                    (topicPartition, offsetAndMetadata) -> {
                        long offSet = sink.getOffset(topicPartition);
                        if (offSet != 0) {
                            committedOffsets.put(topicPartition, new OffsetAndMetadata(offSet));
                        }
                    });
        } catch (Exception e) {
            return new HashMap<>();
        }
        LOG.info("Returning committed offsets {}", committedOffsets);
        return committedOffsets;
    }