protected void startServices()

in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java [154:237]


    protected void startServices() throws Exception {
        synchronized (lock) {
            LOG.info("Initializing Coordinator services.");
            List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, ServerType.COORDINATOR);
            this.serverId = UUID.randomUUID().toString();

            // for metrics
            this.metricRegistry = MetricRegistry.create(conf, pluginManager);
            this.serverMetricGroup =
                    ServerMetricUtils.createCoordinatorGroup(
                            metricRegistry,
                            ServerMetricUtils.validateAndGetClusterId(conf),
                            endpoints.get(0).getHost(),
                            serverId);

            this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);

            this.metadataCache = new ServerMetadataCacheImpl();

            this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);
            if (authorizer != null) {
                authorizer.startup();
            }

            this.lakeTableTieringManager = new LakeTableTieringManager();

            MetadataManager metadataManager = new MetadataManager(zkClient, conf);
            this.coordinatorService =
                    new CoordinatorService(
                            conf,
                            remoteFileSystem,
                            zkClient,
                            this::getCoordinatorEventManager,
                            metadataCache,
                            metadataManager,
                            authorizer,
                            createLakeCatalog());

            this.rpcServer =
                    RpcServer.create(
                            conf,
                            endpoints,
                            coordinatorService,
                            serverMetricGroup,
                            RequestsMetrics.createCoordinatorServerRequestMetrics(
                                    serverMetricGroup));
            rpcServer.start();

            registerCoordinatorLeader();

            this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME);
            this.rpcClient = RpcClient.create(conf, clientMetricGroup);

            this.coordinatorChannelManager = new CoordinatorChannelManager(rpcClient);

            this.autoPartitionManager =
                    new AutoPartitionManager(metadataCache, metadataManager, conf);
            autoPartitionManager.start();

            int ioExecutorPoolSize = conf.get(ConfigOptions.COORDINATOR_IO_POOL_SIZE);
            this.ioExecutor =
                    Executors.newFixedThreadPool(
                            ioExecutorPoolSize, new ExecutorThreadFactory("coordinator-io"));

            // start coordinator event processor after we register coordinator leader to zk
            // so that the event processor can get the coordinator leader node from zk during start
            // up.
            // in HA for coordinator server, the processor also need to know the leader node during
            // start up
            this.coordinatorEventProcessor =
                    new CoordinatorEventProcessor(
                            zkClient,
                            metadataCache,
                            coordinatorChannelManager,
                            autoPartitionManager,
                            lakeTableTieringManager,
                            serverMetricGroup,
                            conf,
                            ioExecutor);
            coordinatorEventProcessor.startup();

            createDefaultDatabase();
        }
    }