in adapter/runtime/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/boot/trigger/TriggerTaskContext.java [114:138]
public void pause(List<RecordPartition> recordPartitions) {
if (recordPartitions == null || recordPartitions.size() == 0) {
logger.warn("recordPartitions is null or recordPartitions.size() is zero. recordPartitions {}", JSON.toJSONString(recordPartitions));
return;
}
for (RecordPartition recordPartition : recordPartitions) {
if (null == recordPartition || null == recordPartition.getPartition()) {
logger.warn("recordPartition {} info is null", recordPartition);
continue;
}
String brokerName = (String) recordPartition.getPartition().get(BROKER_NAME);
String topic = (String) recordPartition.getPartition().get(TOPIC);
Integer queueId = Integer.valueOf((String) recordPartition.getPartition().get(QUEUE_ID));
if (StringUtils.isEmpty(brokerName) || StringUtils.isEmpty(topic) || null == queueId) {
logger.warn("brokerName is null or queueId is null or queueName is null, brokerName {}, queueId {} queueId {}", brokerName, queueId, topic);
continue;
}
MessageQueue messageQueue = new MessageQueue(topic, brokerName, queueId);
if (!messageQueuesOffsetMap.containsKey(messageQueue)) {
logger.warn("sink task current messageQueuesOffsetMap {} not contain messageQueue {}", messageQueuesOffsetMap, messageQueue);
continue;
}
messageQueuesStateMap.put(messageQueue, QueueState.PAUSE);
}
}