in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java [154:237]
protected void startServices() throws Exception {
synchronized (lock) {
LOG.info("Initializing Coordinator services.");
List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, ServerType.COORDINATOR);
this.serverId = UUID.randomUUID().toString();
// for metrics
this.metricRegistry = MetricRegistry.create(conf, pluginManager);
this.serverMetricGroup =
ServerMetricUtils.createCoordinatorGroup(
metricRegistry,
ServerMetricUtils.validateAndGetClusterId(conf),
endpoints.get(0).getHost(),
serverId);
this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
this.metadataCache = new ServerMetadataCacheImpl();
this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);
if (authorizer != null) {
authorizer.startup();
}
this.lakeTableTieringManager = new LakeTableTieringManager();
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
this.coordinatorService =
new CoordinatorService(
conf,
remoteFileSystem,
zkClient,
this::getCoordinatorEventManager,
metadataCache,
metadataManager,
authorizer,
createLakeCatalog());
this.rpcServer =
RpcServer.create(
conf,
endpoints,
coordinatorService,
serverMetricGroup,
RequestsMetrics.createCoordinatorServerRequestMetrics(
serverMetricGroup));
rpcServer.start();
registerCoordinatorLeader();
this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME);
this.rpcClient = RpcClient.create(conf, clientMetricGroup);
this.coordinatorChannelManager = new CoordinatorChannelManager(rpcClient);
this.autoPartitionManager =
new AutoPartitionManager(metadataCache, metadataManager, conf);
autoPartitionManager.start();
int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
this.ioExecutor =
Executors.newFixedThreadPool(
ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));
// start coordinator event processor after we register coordinator leader to zk
// so that the event processor can get the coordinator leader node from zk during start
// up.
// in HA for coordinator server, the processor also need to know the leader node during
// start up
this.coordinatorEventProcessor =
new CoordinatorEventProcessor(
zkClient,
metadataCache,
coordinatorChannelManager,
autoPartitionManager,
lakeTableTieringManager,
serverMetricGroup,
conf,
ioExecutor);
coordinatorEventProcessor.startup();
createDefaultDatabase();
}
}