in tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java [102:273]
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
if (commandLine.hasOption('n')) {
defaultMQAdminExt.setNamesrvAddr(commandLine.getOptionValue('n').trim());
}
try {
defaultMQAdminExt.start();
boolean showClientIP = commandLine.hasOption('s')
&& "true".equalsIgnoreCase(commandLine.getOptionValue('s'));
String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null;
if (commandLine.hasOption('g')) {
String consumerGroup = commandLine.getOptionValue('g').trim();
String topicName = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : null;
ConsumeStats consumeStats;
if (topicName == null) {
consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
} else {
consumeStats = defaultMQAdminExt.examineConsumeStats(clusterName, consumerGroup, topicName);
}
List<MessageQueue> mqList = new LinkedList<>(consumeStats.getOffsetTable().keySet());
Collections.sort(mqList);
Map<MessageQueue, String> messageQueueAllocationResult = null;
if (showClientIP) {
messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
}
if (showClientIP) {
System.out.printf("%-64s %-32s %-4s %-20s %-20s %-20s %-20s %-20s%s%n",
"#Topic",
"#Broker Name",
"#QID",
"#Broker Offset",
"#Consumer Offset",
"#Client IP",
"#Diff",
"#Inflight",
"#LastTime");
} else {
System.out.printf("%-64s %-32s %-4s %-20s %-20s %-20s %-20s%s%n",
"#Topic",
"#Broker Name",
"#QID",
"#Broker Offset",
"#Consumer Offset",
"#Diff",
"#Inflight",
"#LastTime");
}
long diffTotal = 0L;
long inflightTotal = 0L;
for (MessageQueue mq : mqList) {
OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq);
long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
long inflight = offsetWrapper.getPullOffset() - offsetWrapper.getConsumerOffset();
diffTotal += diff;
inflightTotal += inflight;
String lastTime = "";
try {
if (offsetWrapper.getLastTimestamp() == 0) {
lastTime = "N/A";
} else {
lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
}
} catch (Exception e) {
// ignore
}
String clientIP = null;
if (showClientIP) {
clientIP = messageQueueAllocationResult.get(mq);
}
if (showClientIP) {
System.out.printf("%-64s %-32s %-4d %-20d %-20d %-20s %-20d %-20d %s%n",
UtilAll.frontStringAtLeast(mq.getTopic(), 64),
UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
mq.getQueueId(),
offsetWrapper.getBrokerOffset(),
offsetWrapper.getConsumerOffset(),
null != clientIP ? clientIP : "N/A",
diff,
inflight,
lastTime
);
} else {
System.out.printf("%-64s %-32s %-4d %-20d %-20d %-20d %-20d %s%n",
UtilAll.frontStringAtLeast(mq.getTopic(), 64),
UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
mq.getQueueId(),
offsetWrapper.getBrokerOffset(),
offsetWrapper.getConsumerOffset(),
diff,
inflight,
lastTime
);
}
}
System.out.printf("%n");
System.out.printf("Consume TPS: %.2f%n", consumeStats.getConsumeTps());
System.out.printf("Consume Diff Total: %d%n", diffTotal);
System.out.printf("Consume Inflight Total: %d%n", inflightTotal);
} else {
System.out.printf("%-64s %-6s %-24s %-5s %-14s %-7s %s%n",
"#Group",
"#Count",
"#Version",
"#Type",
"#Model",
"#TPS",
"#Diff Total"
);
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
for (String topic : topicList.getTopicList()) {
if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String consumerGroup = KeyBuilder.parseGroup(topic);
try {
ConsumeStats consumeStats = null;
try {
consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
} catch (Exception e) {
log.warn("examineConsumeStats exception, " + consumerGroup, e);
}
ConsumerConnection cc = null;
try {
cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
} catch (Exception e) {
log.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e);
}
GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
groupConsumeInfo.setGroup(consumerGroup);
if (consumeStats != null) {
groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps());
groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
}
if (cc != null) {
groupConsumeInfo.setCount(cc.getConnectionSet().size());
groupConsumeInfo.setMessageModel(cc.getMessageModel());
groupConsumeInfo.setConsumeType(cc.getConsumeType());
groupConsumeInfo.setVersion(cc.computeMinVersion());
}
System.out.printf("%-64s %-6d %-24s %-5s %-14s %-7d %d%n",
UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 64),
groupConsumeInfo.getCount(),
groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE",
groupConsumeInfo.consumeTypeDesc(),
groupConsumeInfo.messageModelDesc(),
groupConsumeInfo.getConsumeTps(),
groupConsumeInfo.getDiffTotal()
);
} catch (Exception e) {
log.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + consumerGroup, e);
}
}
}
}
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}