public void execute()

in tools/src/main/java/org/apache/rocketmq/tools/command/consumer/ConsumerProgressSubCommand.java [102:273]


    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
        DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
        defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

        if (commandLine.hasOption('n')) {
            defaultMQAdminExt.setNamesrvAddr(commandLine.getOptionValue('n').trim());
        }

        try {
            defaultMQAdminExt.start();

            boolean showClientIP = commandLine.hasOption('s')
                && "true".equalsIgnoreCase(commandLine.getOptionValue('s'));

            String clusterName = commandLine.hasOption('c') ? commandLine.getOptionValue('c').trim() : null;

            if (commandLine.hasOption('g')) {
                String consumerGroup = commandLine.getOptionValue('g').trim();
                String topicName = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : null;
                ConsumeStats consumeStats;
                if (topicName == null) {
                    consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
                } else {
                    consumeStats = defaultMQAdminExt.examineConsumeStats(clusterName, consumerGroup, topicName);
                }
                List<MessageQueue> mqList = new LinkedList<>(consumeStats.getOffsetTable().keySet());
                Collections.sort(mqList);

                Map<MessageQueue, String> messageQueueAllocationResult = null;
                if (showClientIP) {
                    messageQueueAllocationResult = getMessageQueueAllocationResult(defaultMQAdminExt, consumerGroup);
                }
                if (showClientIP) {
                    System.out.printf("%-64s  %-32s  %-4s  %-20s  %-20s  %-20s %-20s %-20s%s%n",
                            "#Topic",
                            "#Broker Name",
                            "#QID",
                            "#Broker Offset",
                            "#Consumer Offset",
                            "#Client IP",
                            "#Diff",
                            "#Inflight",
                            "#LastTime");
                } else {
                    System.out.printf("%-64s  %-32s  %-4s  %-20s  %-20s  %-20s %-20s%s%n",
                            "#Topic",
                            "#Broker Name",
                            "#QID",
                            "#Broker Offset",
                            "#Consumer Offset",
                            "#Diff",
                            "#Inflight",
                            "#LastTime");
                }
                long diffTotal = 0L;
                long inflightTotal = 0L;
                for (MessageQueue mq : mqList) {
                    OffsetWrapper offsetWrapper = consumeStats.getOffsetTable().get(mq);
                    long diff = offsetWrapper.getBrokerOffset() - offsetWrapper.getConsumerOffset();
                    long inflight = offsetWrapper.getPullOffset() - offsetWrapper.getConsumerOffset();
                    diffTotal += diff;
                    inflightTotal += inflight;
                    String lastTime = "";
                    try {
                        if (offsetWrapper.getLastTimestamp() == 0) {
                            lastTime = "N/A";
                        } else {
                            lastTime = UtilAll.formatDate(new Date(offsetWrapper.getLastTimestamp()), UtilAll.YYYY_MM_DD_HH_MM_SS);
                        }
                    } catch (Exception e) {
                        // ignore
                    }

                    String clientIP = null;
                    if (showClientIP) {
                        clientIP = messageQueueAllocationResult.get(mq);
                    }
                    if (showClientIP) {
                        System.out.printf("%-64s  %-32s  %-4d  %-20d  %-20d  %-20s %-20d %-20d %s%n",
                                UtilAll.frontStringAtLeast(mq.getTopic(), 64),
                                UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
                                mq.getQueueId(),
                                offsetWrapper.getBrokerOffset(),
                                offsetWrapper.getConsumerOffset(),
                                null != clientIP ? clientIP : "N/A",
                                diff,
                                inflight,
                                lastTime
                        );
                    } else {
                        System.out.printf("%-64s  %-32s  %-4d  %-20d  %-20d  %-20d %-20d %s%n",
                                UtilAll.frontStringAtLeast(mq.getTopic(), 64),
                                UtilAll.frontStringAtLeast(mq.getBrokerName(), 32),
                                mq.getQueueId(),
                                offsetWrapper.getBrokerOffset(),
                                offsetWrapper.getConsumerOffset(),
                                diff,
                                inflight,
                                lastTime
                        );
                    }
                }

                System.out.printf("%n");
                System.out.printf("Consume TPS: %.2f%n", consumeStats.getConsumeTps());
                System.out.printf("Consume Diff Total: %d%n", diffTotal);
                System.out.printf("Consume Inflight Total: %d%n", inflightTotal);
            } else {
                System.out.printf("%-64s  %-6s  %-24s %-5s  %-14s  %-7s  %s%n",
                    "#Group",
                    "#Count",
                    "#Version",
                    "#Type",
                    "#Model",
                    "#TPS",
                    "#Diff Total"
                );
                TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
                for (String topic : topicList.getTopicList()) {
                    if (topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        String consumerGroup = KeyBuilder.parseGroup(topic);
                        try {
                            ConsumeStats consumeStats = null;
                            try {
                                consumeStats = defaultMQAdminExt.examineConsumeStats(consumerGroup);
                            } catch (Exception e) {
                                log.warn("examineConsumeStats exception, " + consumerGroup, e);
                            }

                            ConsumerConnection cc = null;
                            try {
                                cc = defaultMQAdminExt.examineConsumerConnectionInfo(consumerGroup);
                            } catch (Exception e) {
                                log.warn("examineConsumerConnectionInfo exception, " + consumerGroup, e);
                            }

                            GroupConsumeInfo groupConsumeInfo = new GroupConsumeInfo();
                            groupConsumeInfo.setGroup(consumerGroup);

                            if (consumeStats != null) {
                                groupConsumeInfo.setConsumeTps((int) consumeStats.getConsumeTps());
                                groupConsumeInfo.setDiffTotal(consumeStats.computeTotalDiff());
                            }

                            if (cc != null) {
                                groupConsumeInfo.setCount(cc.getConnectionSet().size());
                                groupConsumeInfo.setMessageModel(cc.getMessageModel());
                                groupConsumeInfo.setConsumeType(cc.getConsumeType());
                                groupConsumeInfo.setVersion(cc.computeMinVersion());
                            }

                            System.out.printf("%-64s  %-6d  %-24s %-5s  %-14s  %-7d  %d%n",
                                UtilAll.frontStringAtLeast(groupConsumeInfo.getGroup(), 64),
                                groupConsumeInfo.getCount(),
                                groupConsumeInfo.getCount() > 0 ? groupConsumeInfo.versionDesc() : "OFFLINE",
                                groupConsumeInfo.consumeTypeDesc(),
                                groupConsumeInfo.messageModelDesc(),
                                groupConsumeInfo.getConsumeTps(),
                                groupConsumeInfo.getDiffTotal()
                            );
                        } catch (Exception e) {
                            log.warn("examineConsumeStats or examineConsumerConnectionInfo exception, " + consumerGroup, e);
                        }
                    }
                }
            }
        } catch (Exception e) {
            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
        } finally {
            defaultMQAdminExt.shutdown();
        }
    }