in broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java [332:463]
public BrokerController(
final BrokerConfig brokerConfig,
final NettyServerConfig nettyServerConfig,
final NettyClientConfig nettyClientConfig,
final MessageStoreConfig messageStoreConfig,
final AuthConfig authConfig
) {
this.brokerConfig = brokerConfig;
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
this.authConfig = authConfig;
this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
if (ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion())) {
this.configStorage = new ConfigStorage(messageStoreConfig);
this.topicConfigManager = new TopicConfigManagerV2(this, configStorage);
this.subscriptionGroupManager = new SubscriptionGroupManagerV2(this, configStorage);
this.consumerOffsetManager = new ConsumerOffsetManagerV2(this, configStorage);
} else if (this.messageStoreConfig.isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = new RocksDBConsumerOffsetManager(this);
} else {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
}
this.topicQueueMappingManager = new TopicQueueMappingManager(this);
this.authenticationMetadataManager = AuthenticationFactory.getMetadataManager(this.authConfig);
this.authorizationMetadataManager = AuthorizationFactory.getMetadataManager(this.authConfig);
this.pullMessageProcessor = new PullMessageProcessor(this);
this.peekMessageProcessor = new PeekMessageProcessor(this);
this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
this.popMessageProcessor = new PopMessageProcessor(this);
this.notificationProcessor = new NotificationProcessor(this);
this.pollingInfoProcessor = new PollingInfoProcessor(this);
this.ackMessageProcessor = new AckMessageProcessor(this);
this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
this.sendMessageProcessor = new SendMessageProcessor(this);
this.recallMessageProcessor = new RecallMessageProcessor(this);
this.replyMessageProcessor = new ReplyMessageProcessor(this);
this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService, this.popMessageProcessor, this.notificationProcessor);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener, this.brokerStatsManager, this.brokerConfig);
this.producerManager = new ProducerManager(this.brokerStatsManager);
this.consumerFilterManager = new ConsumerFilterManager(this);
this.consumerOrderInfoManager = new ConsumerOrderInfoManager(this);
this.popInflightMessageCounter = new PopInflightMessageCounter(this);
this.popConsumerService = brokerConfig.isPopConsumerKVServiceInit() ? new PopConsumerService(this) : null;
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
this.scheduleMessageService = new ScheduleMessageService(this);
this.coldDataPullRequestHoldService = new ColdDataPullRequestHoldService(this);
this.coldDataCgCtrService = new ColdDataCgCtrService(this);
if (nettyClientConfig != null) {
this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig, authConfig);
}
this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
this.clientManageProcessor = new ClientManageProcessor(this);
this.slaveSynchronize = new SlaveSynchronize(this);
this.endTransactionProcessor = new EndTransactionProcessor(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.putThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getPutThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getPullThreadPoolQueueCapacity());
this.litePullThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getLitePullThreadPoolQueueCapacity());
this.ackThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getAckThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
this.heartbeatThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());
this.endTransactionThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getEndTransactionPoolQueueCapacity());
this.adminBrokerThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getAdminBrokerThreadPoolQueueCapacity());
this.loadBalanceThreadPoolQueue = new LinkedBlockingQueue<>(this.brokerConfig.getLoadBalanceThreadPoolQueueCapacity());
this.brokerFastFailure = new BrokerFastFailure(this);
String brokerConfigPath;
if (brokerConfig.getBrokerConfigPath() != null && !brokerConfig.getBrokerConfigPath().isEmpty()) {
brokerConfigPath = brokerConfig.getBrokerConfigPath();
} else {
brokerConfigPath = BrokerPathConfigHelper.getBrokerConfigPath();
}
this.configuration = new Configuration(
LOG,
brokerConfigPath,
this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig
);
this.brokerStatsManager.setProducerStateGetter(new BrokerStatsManager.StateGetter() {
@Override
public boolean online(String instanceId, String group, String topic) {
if (getTopicConfigManager().getTopicConfigTable().containsKey(NamespaceUtil.wrapNamespace(instanceId, topic))) {
return getProducerManager().groupOnline(NamespaceUtil.wrapNamespace(instanceId, group));
} else {
return getProducerManager().groupOnline(group);
}
}
});
this.brokerStatsManager.setConsumerStateGetter(new BrokerStatsManager.StateGetter() {
@Override
public boolean online(String instanceId, String group, String topic) {
String topicFullName = NamespaceUtil.wrapNamespace(instanceId, topic);
if (getTopicConfigManager().getTopicConfigTable().containsKey(topicFullName)) {
return getConsumerManager().findSubscriptionData(NamespaceUtil.wrapNamespace(instanceId, group), topicFullName) != null;
} else {
return getConsumerManager().findSubscriptionData(group, topic) != null;
}
}
});
this.brokerMemberGroup = new BrokerMemberGroup(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName());
this.brokerMemberGroup.getBrokerAddrs().put(this.brokerConfig.getBrokerId(), this.getBrokerAddr());
this.escapeBridge = new EscapeBridge(this);
this.topicRouteInfoManager = new TopicRouteInfoManager(this);
if (this.brokerConfig.isEnableSlaveActingMaster() && !this.brokerConfig.isSkipPreOnline()) {
this.brokerPreOnlineService = new BrokerPreOnlineService(this);
}
if (this.authConfig != null && this.authConfig.isMigrateAuthFromV1Enabled()) {
new AuthMigrator(this.authConfig).migrate();
}
}