public static void main()

in inlong-tubemq/tubemq-example/src/main/java/org/apache/inlong/tubemq/example/ClientBalanceConsumerExample.java [83:274]


    public static void main(String[] args) throws Throwable {
        // 1. get and initial parameters
        final String masterServers = args[0];
        final String topics = args[1];
        final String group = args[2];
        final int msgCount = Integer.parseInt(args[3]);
        final int totalGroupNodeCnt =
                (args.length > 4) ? Integer.parseInt(args[4]) : 1;
        final int nodeIndexId =
                (args.length > 5) ? Integer.parseInt(args[5]) : 0;

        // 2. initial consumer object
        ConsumerConfig consumerConfig = new ConsumerConfig(masterServers, group);
        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
        messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
        consumer = messageSessionFactory.createBalanceConsumer(consumerConfig);
        final Map<String, TreeSet<String>> topicAndFiltersMap =
                MixedUtils.parseTopicParam(topics);
        final long metaInfoFetchInterval =
                consumer.getConsumerConfig().getPartMetaInfoCheckPeriodMs();
        // 3. start consumer
        ProcessResult procResult = new ProcessResult();
        if (!consumer.start(topicAndFiltersMap, totalGroupNodeCnt, nodeIndexId, procResult)) {
            logger.info("Initial balance consumer failure, errcode is {}, errMsg is {}",
                    procResult.getErrCode(), procResult.getErrMsg());
            return;
        }

        // 4. initial partition assign thread
        Thread metaInfoUpdater = new Thread(new Runnable() {

            @Override
            public void run() {
                QueryMetaResult qryResult = new QueryMetaResult();
                ProcessResult procResult = new ProcessResult();
                do {
                    try {
                        // 4.1 judge consumer is shutdown
                        if (consumer.isShutdown()) {
                            logger.info("Consumer is shutdown!");
                            break;
                        }
                        // 4.2 get partition meta info
                        if (!consumer.getPartitionMetaInfo(qryResult)) {
                            // 4.2.1 judge consumer is shutdown
                            if (qryResult.getErrCode() == TErrCodeConstants.CLIENT_SHUTDOWN) {
                                logger.info("Consumer is shutdown!");
                                break;
                            }
                        } else {
                            // 4.2.2.1 get latest partition meta info
                            Map<String, Boolean> partMetaInfoMap = qryResult.getPartStatusMap();
                            if (partMetaInfoMap != null && !partMetaInfoMap.isEmpty()) {
                                // 4.2.2.2 assign partitions to current node
                                // by totalGroupNodeCnt and nodeIndexId parameters
                                Set<String> configuredTopicPartitions = partMetaInfoMap.keySet();
                                Set<String> assignedPartitions =
                                        configuredTopicPartitions.stream()
                                                .filter(p -> (((((long) p.hashCode()) & 0xffffffffL)
                                                        % consumer.getSourceCount()) == consumer.getNodeId()))
                                                .collect(Collectors.toCollection(TreeSet::new));
                                Set<String> rsvRegisteredPartSet = new TreeSet<>();
                                // 4.2.2.3 get current registered partition set
                                Set<String> curRegisteredPartSet =
                                        consumer.getCurRegisteredPartSet();
                                // 4.2.2.4 remove unassigned or unsubscribable partitions
                                for (String partKey : curRegisteredPartSet) {
                                    if (!assignedPartitions.contains(partKey)
                                            || partMetaInfoMap.get(partKey) == Boolean.FALSE) {
                                        if (!consumer.disconnectFromPartition(partKey, procResult)
                                                && procResult.getErrCode() == TErrCodeConstants.CLIENT_SHUTDOWN) {
                                            logger.info("Consumer is shutdown!");
                                            break;
                                        }
                                        logger.info("Unregister " + partKey
                                                + ", process result is " + procResult.isSuccess()
                                                + ", err info is " + procResult.getErrMsg());
                                    } else {
                                        rsvRegisteredPartSet.add(partKey);
                                    }
                                }
                                // 4.2.2.5 add assigned and subscribable partitions
                                for (String partKey : assignedPartitions) {
                                    if (!rsvRegisteredPartSet.contains(partKey)
                                            && partMetaInfoMap.get(partKey) == Boolean.TRUE) {
                                        // Note: if you do not need to reset the boostrap
                                        // consumption offset value, please set it to
                                        // a negative number
                                        Long boostrapOffset = partitionOffsetMap.get(partKey);
                                        if (!consumer.connect2Partition(partKey,
                                                boostrapOffset == null ? -1L : boostrapOffset,
                                                procResult)) {
                                            // 4.2.2.5.1 if client shutdown, the thread need exit!
                                            if (procResult.getErrCode() == TErrCodeConstants.CLIENT_SHUTDOWN) {
                                                logger.info("Consumer is shutdown!");
                                                break;
                                            }
                                        }
                                        logger.info("Register " + partKey
                                                + ", process result is " + procResult.isSuccess()
                                                + ", err info is " + procResult.getErrMsg());
                                    }
                                }
                            }
                        }
                        // 4.3 wait next assign interval
                        ThreadUtils.sleep(metaInfoFetchInterval);
                    } catch (Throwable e) {
                        logger.error("Consume messages failed!", e);
                    }
                } while (true);
                logger.info("Consumer existed client balance thread!");
            }
        }, "partition_assigner");

        // 5. initial fetch threads
        Thread[] fetchRunners = new Thread[3];
        for (int i = 0; i < fetchRunners.length; i++) {
            fetchRunners[i] = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        int getCount = msgCount;
                        ConsumeResult csmResult = new ConsumeResult();
                        ConfirmResult cfmResult = new ConfirmResult();
                        // wait partition status ready
                        do {
                            if (consumer.isPartitionsReady(5000)
                                    || consumer.isShutdown()) {
                                break;
                            }
                        } while (true);
                        // consume messages
                        do {
                            // 5.1 judge consumer is shutdown
                            if (consumer.isShutdown()) {
                                logger.info("Consumer is shutdown!");
                                break;
                            }
                            // 5.2 get messages
                            if (consumer.getMessage(csmResult)) {
                                // 5.2.1.1 process messages if success
                                List<Message> messageList = csmResult.getMessageList();
                                if (messageList != null && !messageList.isEmpty()) {
                                    msgRcvStats.addMsgCount(
                                            csmResult.getTopicName(), messageList.size());
                                }
                                // 5.2.1.2 store current offset
                                partitionOffsetMap.put(
                                        csmResult.getPartitionKey(), csmResult.getCurrOffset());
                                // 5.2.1.3 confirm messages to server
                                if (consumer.confirmConsume(
                                        csmResult.getConfirmContext(), true, cfmResult)) {
                                    // store confirmed offset
                                    partitionOffsetMap.put(
                                            cfmResult.getPartitionKey(), cfmResult.getCurrOffset());
                                }
                            } else {
                                // 5.2.2.1 print unexpected error
                                if (csmResult.getErrCode() == TErrCodeConstants.CLIENT_SHUTDOWN) {
                                    logger.info("Found that the client has shutdown, exit!");
                                }
                            }
                            // judge reached required message count
                            if (msgCount > 0) {
                                if (--getCount <= 0) {
                                    break;
                                }
                            }
                        } while (true);
                    } catch (Throwable e) {
                        logger.error("Consume messages failed!", e);
                    }
                    msgRcvStats.stopStats();
                    logger.info("Fetch runner exit!");
                }
            }, "_fetch_runner_" + i);
        }

        // 6. start threads
        // 6.1 start partition assign thread
        metaInfoUpdater.start();
        // 6.2 start fetch threads
        for (Thread thread : fetchRunners) {
            thread.start();
        }
        // 6.3 initial statistic thread
        Thread statisticThread =
                new Thread(msgRcvStats, "Receive Statistic Thread");
        statisticThread.start();
    }