public void resetOffset()

in adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java [84:111]


    public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
        if (MapUtils.isEmpty(offsets)) {
            logger.warn("resetOffset, offsets {} is null", offsets);
            return;
        }
        for (Map.Entry<RecordPartition, RecordOffset> entry : offsets.entrySet()) {
            if (null == entry || null == entry.getKey() || null == entry.getKey().getPartition() || null == entry.getValue() || null == entry.getValue().getOffset()) {
                logger.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry);
                continue;
            }
            RecordPartition recordPartition = entry.getKey();
            String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
            String topic = (String) recordPartition.getPartition().get(TOPIC);
            Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
            if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
                logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
                continue;
            }
            MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
            RecordOffset recordOffset = entry.getValue();
            Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
            if (null == offset) {
                logger.warn("resetOffset, offset is null");
                continue;
            }
            messageQueuesOffsetMap.put(messageQueue, offset);
        }
    }