in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java [300:378]
private void initCoordinatorContext() throws Exception {
long start = System.currentTimeMillis();
// get all tablet server's
int[] currentServers = zooKeeperClient.getSortedTabletServerList();
List<ServerInfo> tabletServerInfos = new ArrayList<>();
List<ServerNode> internalServerNodes = new ArrayList<>();
for (int server : currentServers) {
TabletServerRegistration registration = zooKeeperClient.getTabletServer(server).get();
ServerInfo serverInfo =
new ServerInfo(server, registration.getEndpoints(), ServerType.TABLET_SERVER);
// Get internal listener endpoint to send request to tablet server.
Endpoint internalEndpoint = serverInfo.endpoint(internalListenerName);
if (internalEndpoint == null) {
LOG.error(
"Can not find endpoint for listener name {} for tablet server {}",
internalListenerName,
serverInfo);
continue;
}
tabletServerInfos.add(serverInfo);
internalServerNodes.add(
new ServerNode(
server,
internalEndpoint.getHost(),
internalEndpoint.getPort(),
ServerType.TABLET_SERVER));
}
coordinatorContext.setLiveTabletServers(tabletServerInfos);
// init tablet server channels
coordinatorChannelManager.startup(internalServerNodes);
// load all tables
List<TableInfo> autoPartitionTables = new ArrayList<>();
List<Tuple2<TableInfo, Long>> lakeTables = new ArrayList<>();
for (String database : metadataManager.listDatabases()) {
for (String tableName : metadataManager.listTables(database)) {
TablePath tablePath = TablePath.of(database, tableName);
TableInfo tableInfo = metadataManager.getTable(tablePath);
coordinatorContext.putTablePath(tableInfo.getTableId(), tablePath);
coordinatorContext.putTableInfo(tableInfo);
if (tableInfo.getTableConfig().isDataLakeEnabled()) {
// always set to current time,
// todo: should get from the last lake snapshot
lakeTables.add(Tuple2.of(tableInfo, System.currentTimeMillis()));
}
if (tableInfo.isPartitioned()) {
Map<String, Long> partitions =
zooKeeperClient.getPartitionNameAndIds(tablePath);
for (Map.Entry<String, Long> partition : partitions.entrySet()) {
// put partition info to coordinator context
coordinatorContext.putPartition(partition.getValue(), partition.getKey());
}
// if the table is auto partition, put the partitions info
if (tableInfo
.getTableConfig()
.getAutoPartitionStrategy()
.isAutoPartitionEnabled()) {
autoPartitionTables.add(tableInfo);
}
}
}
}
autoPartitionManager.initAutoPartitionTables(autoPartitionTables);
lakeTableTieringManager.initWithLakeTables(lakeTables);
// load all assignment
loadTableAssignment();
loadPartitionAssignment();
long end = System.currentTimeMillis();
LOG.info("Current total {} tables in the cluster.", coordinatorContext.allTables().size());
LOG.info(
"Detect tables {} to be deleted after initializing coordinator context. ",
coordinatorContext.getTablesToBeDeleted());
LOG.info(
"Detect partition {} to be deleted after initializing coordinator context. ",
coordinatorContext.getPartitionsToBeDeleted());
LOG.info("End initializing coordinator context, cost {}ms", end - start);
}