private void initCoordinatorContext()

in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java [300:378]


    private void initCoordinatorContext() throws Exception {
        long start = System.currentTimeMillis();
        // get all tablet server's
        int[] currentServers = zooKeeperClient.getSortedTabletServerList();
        List<ServerInfo> tabletServerInfos = new ArrayList<>();
        List<ServerNode> internalServerNodes = new ArrayList<>();
        for (int server : currentServers) {
            TabletServerRegistration registration = zooKeeperClient.getTabletServer(server).get();
            ServerInfo serverInfo =
                    new ServerInfo(server, registration.getEndpoints(), ServerType.TABLET_SERVER);
            // Get internal listener endpoint to send request to tablet server.
            Endpoint internalEndpoint = serverInfo.endpoint(internalListenerName);
            if (internalEndpoint == null) {
                LOG.error(
                        "Can not find endpoint for listener name {} for tablet server {}",
                        internalListenerName,
                        serverInfo);
                continue;
            }
            tabletServerInfos.add(serverInfo);
            internalServerNodes.add(
                    new ServerNode(
                            server,
                            internalEndpoint.getHost(),
                            internalEndpoint.getPort(),
                            ServerType.TABLET_SERVER));
        }

        coordinatorContext.setLiveTabletServers(tabletServerInfos);
        // init tablet server channels
        coordinatorChannelManager.startup(internalServerNodes);

        // load all tables
        List<TableInfo> autoPartitionTables = new ArrayList<>();
        List<Tuple2<TableInfo, Long>> lakeTables = new ArrayList<>();
        for (String database : metadataManager.listDatabases()) {
            for (String tableName : metadataManager.listTables(database)) {
                TablePath tablePath = TablePath.of(database, tableName);
                TableInfo tableInfo = metadataManager.getTable(tablePath);
                coordinatorContext.putTablePath(tableInfo.getTableId(), tablePath);
                coordinatorContext.putTableInfo(tableInfo);
                if (tableInfo.getTableConfig().isDataLakeEnabled()) {
                    // always set to current time,
                    // todo: should get from the last lake snapshot
                    lakeTables.add(Tuple2.of(tableInfo, System.currentTimeMillis()));
                }
                if (tableInfo.isPartitioned()) {
                    Map<String, Long> partitions =
                            zooKeeperClient.getPartitionNameAndIds(tablePath);
                    for (Map.Entry<String, Long> partition : partitions.entrySet()) {
                        // put partition info to coordinator context
                        coordinatorContext.putPartition(partition.getValue(), partition.getKey());
                    }
                    // if the table is auto partition, put the partitions info
                    if (tableInfo
                            .getTableConfig()
                            .getAutoPartitionStrategy()
                            .isAutoPartitionEnabled()) {
                        autoPartitionTables.add(tableInfo);
                    }
                }
            }
        }
        autoPartitionManager.initAutoPartitionTables(autoPartitionTables);
        lakeTableTieringManager.initWithLakeTables(lakeTables);

        // load all assignment
        loadTableAssignment();
        loadPartitionAssignment();
        long end = System.currentTimeMillis();
        LOG.info("Current total {} tables in the cluster.", coordinatorContext.allTables().size());
        LOG.info(
                "Detect tables {} to be deleted after initializing coordinator context. ",
                coordinatorContext.getTablesToBeDeleted());
        LOG.info(
                "Detect partition {} to be deleted after initializing coordinator context. ",
                coordinatorContext.getPartitionsToBeDeleted());
        LOG.info("End initializing coordinator context, cost {}ms", end - start);
    }