void init()

in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java [98:140]


    void init() throws IOException, RocksDBException {
        raftExecutor = new ThreadPoolExecutor(
                8,
                16,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(10000),
                new ThreadFactoryImpl("RaftExecutor_"));
        requestExecutor = new ThreadPoolExecutor(
                8,
                16,
                1,
                TimeUnit.MINUTES,
                new LinkedBlockingQueue<>(10000),
                new ThreadFactoryImpl("requestExecutor_"));

        registerStateProcessor(new RetainedMsgStateProcessor(this, metaConf.getMaxRetainedTopicNum()));  //add retained msg processor
        registerStateProcessor(new WillMsgStateProcessor(this));

        rt = RouteTable.getInstance();
        localPeerId = PeerId.parsePeer(metaConf.getSelfAddress());
        rpcServer = createRpcServer(this, localPeerId);
        NodeManager.getInstance().addAddress(localPeerId.getEndpoint());
        if (!rpcServer.init(null)) {
            LOGGER.error("Fail to init [BaseRpcServer].");
            throw new RuntimeException("Fail to init [BaseRpcServer].");
        }

        raftGroups = RaftUtil.LIST_RAFT_GROUPS();
        for (String group : raftGroups) {
            String rdbPath = RaftUtil.RAFT_BASE_DIR(group) + File.separator + "rdb";
            FileUtils.forceMkdir(new File(rdbPath));
            RocksDBEngine rocksDBEngine = new RocksDBEngine(rdbPath);
            rocksDBEngine.init();
            MqttStateMachine sm = new MqttStateMachine(this);
            sm.setRocksDBEngine(rocksDBEngine);
            createRaftNode(group, sm);
        }
        scheduler.scheduleAtFixedRate(() -> refreshLeader(), 3, 3, TimeUnit.SECONDS);
        CliOptions cliOptions = new CliOptions();
        this.cliService = RaftServiceFactory.createAndInitCliService(cliOptions);
        this.cliClientService = (CliClientServiceImpl) ((CliServiceImpl) this.cliService).getCliClientService();
    }