in hugegraph-pd/hg-pd-service/src/main/java/org/apache/hugegraph/pd/service/PDService.java [401:521]
public void setStore(Pdpb.SetStoreRequest request,
StreamObserver<Pdpb.SetStoreResponse> observer) {
if (!isLeader()) {
redirectToLeader(PDGrpc.getSetStoreMethod(), request, observer);
return;
}
Pdpb.SetStoreResponse response = null;
try {
Metapb.StoreState state = request.getStore().getState();
Long storeId = request.getStore().getId();
// In the Pending state, you can go online
Metapb.Store lastStore = storeNodeService.getStore(request.getStore().getId());
if (lastStore == null) {
// storeId does not exist, an exception is thrown
throw new PDException(Pdpb.ErrorType.STORE_ID_NOT_EXIST_VALUE,
String.format("Store id %d does not exist!", storeId));
}
if (Metapb.StoreState.Up.equals(state)) {
if (!Metapb.StoreState.Pending.equals(lastStore.getState())) {
throw new PDException(Pdpb.ErrorType.UPDATE_STORE_STATE_ERROR_VALUE,
"only stores in Pending state can be set to Up!");
}
}
if (state.equals(Metapb.StoreState.Offline)) {
Metapb.ClusterStats stats = storeNodeService.getClusterStats();
if (stats.getState() != Metapb.ClusterState.Cluster_OK) {
Pdpb.ResponseHeader errorHeader = newErrorHeader(-1,
"can not offline node "
+
"when cluster state is not " +
"normal ");
response = Pdpb.SetStoreResponse.newBuilder().setHeader(errorHeader).build();
observer.onNext(response);
observer.onCompleted();
return;
}
}
logService.insertLog(LogService.NODE_CHANGE, LogService.GRPC, request.getStore());
// If the check fails, the status will be changed to Pending, and the reason for the
// error will be returned
if (state.equals(Metapb.StoreState.Up)) {
int cores = 0;
long id = request.getStore().getId();
List<Metapb.Store> stores = storeNodeService.getStores();
int nodeCount = 0;
for (Metapb.Store store : stores) {
if (store.getId() == id) {
// Get the cores from the previously registered store as a validation
// parameter
cores = store.getCores();
}
if (store.getState().equals(Metapb.StoreState.Up)) {
nodeCount++;
}
}
try {
//licenseVerifierService.verify(cores, nodeCount);
} catch (Exception e) {
Metapb.Store store = Metapb.Store.newBuilder(request.getStore())
.setState(Metapb.StoreState.Pending).build();
storeNodeService.updateStore(store);
throw new PDException(Pdpb.ErrorType.LICENSE_ERROR_VALUE,
"check license with error :"
+ e.getMessage()
+ ", and changed node state to 'Pending'");
}
}
Metapb.Store store = request.getStore();
// Before going offline, check whether the number of active machines is greater than
// the minimum threshold
if (state.equals(Metapb.StoreState.Tombstone)) {
List<Metapb.Store> activeStores = storeNodeService.getActiveStores();
if (lastStore.getState() == Metapb.StoreState.Up
&& activeStores.size() - 1 < pdConfig.getMinStoreCount()) {
throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE,
"The number of active stores is less then " +
pdConfig.getMinStoreCount());
}
if (!storeNodeService.checkStoreCanOffline(request.getStore())) {
throw new PDException(Pdpb.ErrorType.LESS_ACTIVE_STORE_VALUE,
"check activeStores or online shardsList size");
}
if (lastStore.getState() == Metapb.StoreState.Exiting) {
// If it is already in the offline state, no further processing will be made
throw new PDException(Pdpb.ErrorType.Store_Tombstone_Doing_VALUE,
"Downline is in progress, do not resubmit");
}
Map<String, Object> resultMap = taskService.canAllPartitionsMovedOut(lastStore);
if ((boolean) resultMap.get("flag")) {
if (resultMap.get("current_store_is_online") != null
&& (boolean) resultMap.get("current_store_is_online")) {
log.info("updateStore removeActiveStores store {}", store.getId());
// Set the status of the online store to Offline and wait for the replica
// to be migrated
store = Metapb.Store.newBuilder(lastStore)
.setState(Metapb.StoreState.Exiting).build();
// Perform partition migration operations
taskService.movePartitions((Map<Integer, KVPair<Long, Long>>) resultMap.get(
"movedPartitions"));
} else {
// If the store is offline, the replica is not migrated
// Change the status to Tombstone
}
} else {
throw new PDException(Pdpb.ErrorType.UPDATE_STORE_STATE_ERROR_VALUE,
"the resources on other stores may be not enough to " +
"store " +
"the partitions of current store!");
}
}
store = storeNodeService.updateStore(store);
response =
Pdpb.SetStoreResponse.newBuilder().setHeader(okHeader).setStore(store).build();
} catch (PDException e) {
response = Pdpb.SetStoreResponse.newBuilder().setHeader(newErrorHeader(e)).build();
log.error("setStore exception: ", e);
}
observer.onNext(response);
observer.onCompleted();
}