in fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java [151:238]
protected void startServices() throws Exception {
synchronized (lock) {
LOG.info("Initializing Tablet services.");
List<Endpoint> endpoints = Endpoint.loadBindEndpoints(conf, ServerType.TABLET_SERVER);
// for metrics
this.metricRegistry = MetricRegistry.create(conf, pluginManager);
this.tabletServerMetricGroup =
ServerMetricUtils.createTabletServerGroup(
metricRegistry,
ServerMetricUtils.validateAndGetClusterId(conf),
endpoints.get(0).getHost(),
serverId);
this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
this.metadataCache = new ServerMetadataCacheImpl();
this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
scheduler.startup();
SystemClock systemClock = SystemClock.getInstance();
this.logManager = LogManager.create(conf, zkClient, scheduler, systemClock);
logManager.startup();
this.kvManager = KvManager.create(conf, zkClient, logManager);
kvManager.startup();
this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);
if (authorizer != null) {
authorizer.startup();
}
// rpc client to sent request to the tablet server where the leader replica is located
// to fetch log.
this.clientMetricGroup =
new ClientMetricGroup(metricRegistry, SERVER_NAME + "-" + serverId);
this.rpcClient = RpcClient.create(conf, clientMetricGroup);
CoordinatorGateway coordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() -> metadataCache.getCoordinatorServer(interListenerName).get(),
rpcClient,
CoordinatorGateway.class);
this.replicaManager =
new ReplicaManager(
conf,
scheduler,
logManager,
kvManager,
zkClient,
serverId,
metadataCache,
rpcClient,
coordinatorGateway,
DefaultCompletedKvSnapshotCommitter.create(
rpcClient, metadataCache, interListenerName),
this,
tabletServerMetricGroup,
systemClock);
replicaManager.startup();
MetadataManager metadataManager = new MetadataManager(zkClient, conf);
this.tabletService =
new TabletService(
serverId,
remoteFileSystem,
zkClient,
replicaManager,
metadataCache,
metadataManager,
authorizer);
RequestsMetrics requestsMetrics =
RequestsMetrics.createTabletServerRequestMetrics(tabletServerMetricGroup);
this.rpcServer =
RpcServer.create(
conf,
endpoints,
tabletService,
tabletServerMetricGroup,
requestsMetrics);
rpcServer.start();
registerTabletServer();
}
}