in adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java [84:111]
public void resetOffset(Map<RecordPartition, RecordOffset> offsets) {
if (MapUtils.isEmpty(offsets)) {
logger.warn("resetOffset, offsets {} is null", offsets);
return;
}
for (Map.Entry<RecordPartition, RecordOffset> entry : offsets.entrySet()) {
if (null == entry || null == entry.getKey() || null == entry.getKey().getPartition() || null == entry.getValue() || null == entry.getValue().getOffset()) {
logger.warn("recordPartition {} info is null or recordOffset {} info is null, entry {}", entry);
continue;
}
RecordPartition recordPartition = entry.getKey();
String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
continue;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
RecordOffset recordOffset = entry.getValue();
Long offset = Long.valueOf((String) recordOffset.getOffset().get(QUEUE_OFFSET));
if (null == offset) {
logger.warn("resetOffset, offset is null");
continue;
}
messageQueuesOffsetMap.put(messageQueue, offset);
}
}