public void init()

in hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java [156:342]


    public void init() throws PDException {
        log.info("PDService init………… {}", pdConfig);
        configService = new ConfigService(pdConfig);

        RaftEngine.getInstance().addStateListener(this);
        RaftEngine.getInstance().addStateListener(configService);
        RaftEngine.getInstance().init(pdConfig.getRaft());
        //pdConfig = configService.loadConfig(); onLeaderChanged
        storeNodeService = new StoreNodeService(pdConfig);
        partitionService = new PartitionService(pdConfig, storeNodeService);
        taskService = new TaskScheduleService(pdConfig, storeNodeService, partitionService);
        idService = new IdService(pdConfig);
        logService = new LogService(pdConfig);
        storeMonitorDataService = new StoreMonitorDataService(pdConfig);
        //if (licenseVerifierService == null) {
        //    licenseVerifierService = new LicenseVerifierService(pdConfig);
        //}
        RaftEngine.getInstance().addStateListener(partitionService);
        pdConfig.setIdService(idService);

        // Receive a heartbeat message
        PDPulseSubject.listenPartitionHeartbeat(new PulseListener<PartitionHeartbeatRequest>() {
            @Override
            public void onNext(PartitionHeartbeatRequest request) throws Exception {
                partitionService.partitionHeartbeat(request.getStates());
            }

            @Override
            public void onError(Throwable throwable) {
                log.error("Received an error notice from pd-client", throwable);
            }

            @Override
            public void onCompleted() {
                log.info("Received an completed notice from pd-client");
            }
        });

        /**
         // Listen for partition commands and forward them to Store
         */
        partitionService.addInstructionListener(new PartitionInstructionListener() {
            private PartitionHeartbeatResponse.Builder getBuilder(Metapb.Partition partition) throws
                                                                                              PDException {
                return PartitionHeartbeatResponse.newBuilder().setPartition(partition)
                                                 .setId(idService.getId(TASK_ID_KEY, 1));
            }

            @Override
            public void changeShard(Metapb.Partition partition, ChangeShard changeShard) throws
                                                                                         PDException {
                PDPulseSubject.notifyClient(getBuilder(partition).setChangeShard(changeShard));

            }

            @Override
            public void transferLeader(Metapb.Partition partition,
                                       TransferLeader transferLeader) throws
                                                                      PDException {
                PDPulseSubject.notifyClient(
                        getBuilder(partition).setTransferLeader(transferLeader));
            }

            @Override
            public void splitPartition(Metapb.Partition partition,
                                       SplitPartition splitPartition) throws
                                                                      PDException {
                PDPulseSubject.notifyClient(
                        getBuilder(partition).setSplitPartition(splitPartition));

            }

            @Override
            public void dbCompaction(Metapb.Partition partition, DbCompaction dbCompaction) throws
                                                                                            PDException {
                PDPulseSubject.notifyClient(getBuilder(partition).setDbCompaction(dbCompaction));

            }

            @Override
            public void movePartition(Metapb.Partition partition,
                                      MovePartition movePartition) throws PDException {
                PDPulseSubject.notifyClient(getBuilder(partition).setMovePartition(movePartition));
            }

            @Override
            public void cleanPartition(Metapb.Partition partition,
                                       CleanPartition cleanPartition) throws PDException {
                PDPulseSubject.notifyClient(
                        getBuilder(partition).setCleanPartition(cleanPartition));
            }

            @Override
            public void changePartitionKeyRange(Metapb.Partition partition,
                                                PartitionKeyRange partitionKeyRange)
                    throws PDException {
                PDPulseSubject.notifyClient(getBuilder(partition).setKeyRange(partitionKeyRange));
            }
        });

        /**
         // Listen for partition status change messages and forward them to Client
         */
        partitionService.addStatusListener(new PartitionStatusListener() {
            @Override
            public void onPartitionChanged(Metapb.Partition old, Metapb.Partition partition) {
                PDWatchSubject.notifyPartitionChange(PDWatchSubject.ChangeType.ALTER,
                                                     partition.getGraphName(), partition.getId());
            }

            @Override
            public void onPartitionRemoved(Metapb.Partition partition) {
                PDWatchSubject.notifyPartitionChange(PDWatchSubject.ChangeType.DEL,
                                                     partition.getGraphName(),
                                                     partition.getId());

            }
        });

        storeNodeService.addShardGroupStatusListener(new ShardGroupStatusListener() {
            @Override
            public void onShardListChanged(Metapb.ShardGroup shardGroup,
                                           Metapb.ShardGroup newShardGroup) {
                // invoked before change, saved to db and update cache.
                if (newShardGroup == null) {
                    PDWatchSubject.notifyShardGroupChange(PDWatchSubject.ChangeType.DEL,
                                                          shardGroup.getId(),
                                                          shardGroup);
                } else {
                    PDWatchSubject.notifyShardGroupChange(PDWatchSubject.ChangeType.ALTER,
                                                          shardGroup.getId(), newShardGroup);
                }
            }

            @Override
            public void onShardListOp(Metapb.ShardGroup shardGroup) {
                PDWatchSubject.notifyShardGroupChange(PDWatchSubject.ChangeType.USER_DEFINED,
                                                      shardGroup.getId(), shardGroup);
            }
        });

        /**
         // Listen for store status change messages and forward them to Client
         */
        storeNodeService.addStatusListener(new StoreStatusListener() {

            @Override
            public void onStoreStatusChanged(Metapb.Store store,
                                             Metapb.StoreState old,
                                             Metapb.StoreState status) {
                NodeEventType type = NodeEventType.NODE_EVENT_TYPE_UNKNOWN;
                if (status == Metapb.StoreState.Up) {
                    type = NodeEventType.NODE_EVENT_TYPE_NODE_ONLINE;
                } else if (status == Metapb.StoreState.Offline) {
                    type = NodeEventType.NODE_EVENT_TYPE_NODE_OFFLINE;
                }
                PDWatchSubject.notifyNodeChange(type, "", store.getId());
            }

            @Override
            public void onGraphChange(Metapb.Graph graph,
                                      Metapb.GraphState stateOld,
                                      Metapb.GraphState stateNew) {
                WatchGraphResponse wgr = WatchGraphResponse.newBuilder()
                                                           .setGraph(graph)
                                                           .build();
                WatchResponse.Builder wr = WatchResponse.newBuilder()
                                                        .setGraphResponse(wgr);
                PDWatchSubject.notifyChange(WatchType.WATCH_TYPE_GRAPH_CHANGE,
                                            wr);
            }

            @Override
            public void onStoreRaftChanged(Metapb.Store store) {
                PDWatchSubject.notifyNodeChange(NodeEventType.NODE_EVENT_TYPE_NODE_RAFT_CHANGE, "",
                                                store.getId());
            }
        });
        storeNodeService.init(partitionService);
        partitionService.init();
        taskService.init();
        // log.info("init .......");
        // licenseVerifierService.init();

        // UpgradeService upgradeService = new UpgradeService(pdConfig);
        // upgradeService.upgrade();
    }