in src/main/java/org/apache/rocketmq/dashboard/service/impl/MessageServiceImpl.java [208:234]
public ConsumeMessageDirectlyResult consumeMessageDirectly(String topic, String msgId, String consumerGroup,
String clientId) {
if (StringUtils.isNotBlank(clientId)) {
try {
return mqAdminExt.consumeMessageDirectly(consumerGroup, clientId, topic, msgId);
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
}
try {
ConsumerConnection consumerConnection = mqAdminExt.examineConsumerConnectionInfo(consumerGroup);
for (Connection connection : consumerConnection.getConnectionSet()) {
if (StringUtils.isBlank(connection.getClientId())) {
continue;
}
logger.info("clientId={}", connection.getClientId());
return mqAdminExt.consumeMessageDirectly(consumerGroup, connection.getClientId(), topic, msgId);
}
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new RuntimeException(e);
}
throw new IllegalStateException("NO CONSUMER");
}