in mqtt-meta/src/main/java/org/apache/rocketmq/mqtt/meta/raft/MqttRaftServer.java [98:140]
void init() throws IOException, RocksDBException {
raftExecutor = new ThreadPoolExecutor(
8,
16,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(10000),
new ThreadFactoryImpl("RaftExecutor_"));
requestExecutor = new ThreadPoolExecutor(
8,
16,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(10000),
new ThreadFactoryImpl("requestExecutor_"));
registerStateProcessor(new RetainedMsgStateProcessor(this, metaConf.getMaxRetainedTopicNum())); //add retained msg processor
registerStateProcessor(new WillMsgStateProcessor(this));
rt = RouteTable.getInstance();
localPeerId = PeerId.parsePeer(metaConf.getSelfAddress());
rpcServer = createRpcServer(this, localPeerId);
NodeManager.getInstance().addAddress(localPeerId.getEndpoint());
if (!rpcServer.init(null)) {
LOGGER.error("Fail to init [BaseRpcServer].");
throw new RuntimeException("Fail to init [BaseRpcServer].");
}
raftGroups = RaftUtil.LIST_RAFT_GROUPS();
for (String group : raftGroups) {
String rdbPath = RaftUtil.RAFT_BASE_DIR(group) + File.separator + "rdb";
FileUtils.forceMkdir(new File(rdbPath));
RocksDBEngine rocksDBEngine = new RocksDBEngine(rdbPath);
rocksDBEngine.init();
MqttStateMachine sm = new MqttStateMachine(this);
sm.setRocksDBEngine(rocksDBEngine);
createRaftNode(group, sm);
}
scheduler.scheduleAtFixedRate(() -> refreshLeader(), 3, 3, TimeUnit.SECONDS);
CliOptions cliOptions = new CliOptions();
this.cliService = RaftServiceFactory.createAndInitCliService(cliOptions);
this.cliClientService = (CliClientServiceImpl) ((CliServiceImpl) this.cliService).getCliClientService();
}