protected void startServices()

in fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java [151:238]


    protected void startServices() throws Exception {
        synchronized (lock) {
            LOG.info("Initializing Tablet services.");

            List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, ServerType.TABLET_SERVER);

            // for metrics
            this.metricRegistry = MetricRegistry.create(conf, pluginManager);
            this.tabletServerMetricGroup =
                    ServerMetricUtils.createTabletServerGroup(
                            metricRegistry,
                            ServerMetricUtils.validateAndGetClusterId(conf),
                            endpoints.get(0).getHost(),
                            serverId);

            this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);

            this.metadataCache = new ServerMetadataCacheImpl();

            this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
            scheduler.startup();

            SystemClock systemClock = SystemClock.getInstance();
            this.logManager = LogManager.create(conf, zkClient, scheduler, systemClock);
            logManager.startup();

            this.kvManager = KvManager.create(conf, zkClient, logManager);
            kvManager.startup();

            this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);
            if (authorizer != null) {
                authorizer.startup();
            }
            // rpc client to sent request to the tablet server where the leader replica is located
            // to fetch log.
            this.clientMetricGroup =
                    new ClientMetricGroup(metricRegistry, SERVER_NAME + "-" + serverId);
            this.rpcClient = RpcClient.create(conf, clientMetricGroup);

            CoordinatorGateway coordinatorGateway =
                    GatewayClientProxy.createGatewayProxy(
                            () -> metadataCache.getCoordinatorServer(interListenerName).get(),
                            rpcClient,
                            CoordinatorGateway.class);

            this.replicaManager =
                    new ReplicaManager(
                            conf,
                            scheduler,
                            logManager,
                            kvManager,
                            zkClient,
                            serverId,
                            metadataCache,
                            rpcClient,
                            coordinatorGateway,
                            DefaultCompletedKvSnapshotCommitter.create(
                                    rpcClient, metadataCache, interListenerName),
                            this,
                            tabletServerMetricGroup,
                            systemClock);
            replicaManager.startup();

            MetadataManager metadataManager = new MetadataManager(zkClient, conf);
            this.tabletService =
                    new TabletService(
                            serverId,
                            remoteFileSystem,
                            zkClient,
                            replicaManager,
                            metadataCache,
                            metadataManager,
                            authorizer);

            RequestsMetrics requestsMetrics =
                    RequestsMetrics.createTabletServerRequestMetrics(tabletServerMetricGroup);
            this.rpcServer =
                    RpcServer.create(
                            conf,
                            endpoints,
                            tabletService,
                            tabletServerMetricGroup,
                            requestsMetrics);
            rpcServer.start();

            registerTabletServer();
        }
    }