public void setStore()

in hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java [401:521]


    public void setStore(Pdpb.SetStoreRequest request,
                         StreamObserver<Pdpb.SetStoreResponse> observer) {
        if (!isLeader()) {
            redirectToLeader(PDGrpc.getSetStoreMethod(), request, observer);
            return;
        }
        Pdpb.SetStoreResponse response = null;
        try {
            Metapb.StoreState state = request.getStore().getState();
            Long storeId = request.getStore().getId();
            // In the Pending state, you can go online
            Metapb.Store lastStore = storeNodeService.getStore(request.getStore().getId());
            if (lastStore == null) {
                // storeId does not exist, an exception is thrown
                throw new PDException(Pdpb.ErrorType.STORE_ID_NOT_EXIST_VALUE,
                                      String.format("Store id %d does not exist!", storeId));
            }
            if (Metapb.StoreState.Up.equals(state)) {
                if (!Metapb.StoreState.Pending.equals(lastStore.getState())) {
                    throw new PDException(Pdpb.ErrorType.UPDATE_STORE_STATE_ERROR_VALUE,
                                          "only stores in Pending state can be set to Up!");
                }
            }
            if (state.equals(Metapb.StoreState.Offline)) {
                Metapb.ClusterStats stats = storeNodeService.getClusterStats();
                if (stats.getState() != Metapb.ClusterState.Cluster_OK) {
                    Pdpb.ResponseHeader errorHeader = newErrorHeader(-1,
                                                                     "can not offline node "
                                                                     +
                                                                     "when cluster state is not " +
                                                                     "normal ");
                    response = Pdpb.SetStoreResponse.newBuilder().setHeader(errorHeader).build();
                    observer.onNext(response);
                    observer.onCompleted();
                    return;
                }
            }
            logService.insertLog(LogService.NODE_CHANGE, LogService.GRPC, request.getStore());
            // If the check fails, the status will be changed to Pending, and the reason for the
            // error will be returned
            if (state.equals(Metapb.StoreState.Up)) {
                int cores = 0;
                long id = request.getStore().getId();
                List<Metapb.Store> stores = storeNodeService.getStores();
                int nodeCount = 0;
                for (Metapb.Store store : stores) {
                    if (store.getId() == id) {
                        // Get the cores from the previously registered store as a validation
                        // parameter
                        cores = store.getCores();
                    }
                    if (store.getState().equals(Metapb.StoreState.Up)) {
                        nodeCount++;
                    }
                }
                try {
                    //licenseVerifierService.verify(cores, nodeCount);
                } catch (Exception e) {
                    Metapb.Store store = Metapb.Store.newBuilder(request.getStore())
                                                     .setState(Metapb.StoreState.Pending).build();
                    storeNodeService.updateStore(store);
                    throw new PDException(Pdpb.ErrorType.LICENSE_ERROR_VALUE,
                                          "check license with error :"
                                          + e.getMessage()
                                          + ", and changed node state to 'Pending'");
                }
            }
            Metapb.Store store = request.getStore();
            // Before going offline, check whether the number of active machines is greater than
            // the minimum threshold
            if (state.equals(Metapb.StoreState.Tombstone)) {
                List<Metapb.Store> activeStores = storeNodeService.getActiveStores();
                if (lastStore.getState() == Metapb.StoreState.Up
                    && activeStores.size() - 1 < pdConfig.getMinStoreCount()) {
                    throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE,
                                          "The number of active stores is less then " +
                                          pdConfig.getMinStoreCount());
                }
                if (!storeNodeService.checkStoreCanOffline(request.getStore())) {
                    throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE,
                                          "check activeStores or online shardsList size");
                }
                if (lastStore.getState() == Metapb.StoreState.Exiting) {
                    // If it is already in the offline state, no further processing will be made
                    throw new PDException(Pdpb.ErrorType.Store_Tombstone_Doing_VALUE,
                                          "Downline is in progress, do not resubmit");
                }
                Map<String, Object> resultMap = taskService.canAllPartitionsMovedOut(lastStore);
                if ((boolean) resultMap.get("flag")) {
                    if (resultMap.get("current_store_is_online") != null
                        && (boolean) resultMap.get("current_store_is_online")) {
                        log.info("updateStore removeActiveStores store {}", store.getId());
                        // Set the status of the online store to Offline and wait for the replica
                        // to be migrated
                        store = Metapb.Store.newBuilder(lastStore)
                                            .setState(Metapb.StoreState.Exiting).build();
                        // Perform partition migration operations
                        taskService.movePartitions((Map<Integer, KVPair<Long, Long>>) resultMap.get(
                                "movedPartitions"));
                    } else {
                        // If the store is offline, the replica is not migrated
                        // Change the status to Tombstone
                    }
                } else {
                    throw new PDException(Pdpb.ErrorType.UPDATE_STORE_STATE_ERROR_VALUE,
                                          "the resources on other stores may be not enough to " +
                                          "store " +
                                          "the partitions of current store!");
                }
            }
            store = storeNodeService.updateStore(store);
            response =
                    Pdpb.SetStoreResponse.newBuilder().setHeader(okHeader).setStore(store).build();
        } catch (PDException e) {
            response = Pdpb.SetStoreResponse.newBuilder().setHeader(newErrorHeader(e)).build();
            log.error("setStore exception: ", e);
        }

        observer.onNext(response);
        observer.onCompleted();
    }