public void doChangeShard()

in hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/PartitionEngine.java [740:838]


    public void doChangeShard(final MetaTask.Task task, Closure done) {
        if (!isLeader()) {
            return;
        }

        log.info("Raft {} doChangeShard task is {}", getGroupId(), task);
        // If the same partition has the same task executing, ignore task execution.
        if (taskManager.partitionTaskRepeat(task.getPartition().getId(),
                                            task.getPartition().getGraphName(),
                                            task.getType().name())) {
            log.error("Raft {} doChangeShard task repeat, type:{}", getGroupId(), task.getType());
            return;
        }
        // Task not completed, repeat execution.
        if (task.getState().getNumber() < MetaTask.TaskState.Task_Stop_VALUE && isLeader()) {
            Utils.runInThread(() -> {
                try {
                    // cannot changePeers in the state machine
                    List<String> peers =
                            partitionManager.shards2Peers(task.getChangeShard().getShardList());
                    HashSet<String> hashSet = new HashSet<>(peers);
                    // Task has the same peers, indicating there is an error in the task itself, task ignored
                    if (peers.size() != hashSet.size()) {
                        log.info("Raft {} doChangeShard peer is repeat, peers: {}", getGroupId(),
                                 peers);
                    }
                    Status result;
                    if (changingPeer.compareAndSet(false, true)) {
                        result = this.changePeers(peers, done);
                    } else {
                        result = HgRaftError.TASK_ERROR.toStatus();
                    }

                    if (result.getCode() != HgRaftError.TASK_CONTINUE.getNumber()) {
                        log.info("Raft {} doChangeShard is finished, status is {}", getGroupId(),
                                 result);
                        // Task completed, synchronize task status
                        MetaTask.Task newTask;
                        if (result.isOk()) {
                            newTask = task.toBuilder().setState(MetaTask.TaskState.Task_Success)
                                          .build();
                        } else {
                            log.warn(
                                    "Raft {} doChangeShard is failure, need to retry, status is {}",
                                    getGroupId(), result);
                            try {
                                // Reduce send times
                                Thread.sleep(1000);
                            } catch (Exception e) {
                                log.error("wait 1s to resend retry task. got error:{}",
                                          e.getMessage());
                            }
                            newTask = task.toBuilder().setState(MetaTask.TaskState.Task_Ready)
                                          .build();
                        }
                        try {
                            // During the waiting process, it may have already shut down.
                            if (isLeader()) {
                                storeEngine.addRaftTask(newTask.getPartition().getGraphName(),
                                                        newTask.getPartition().getId(),
                                                        RaftOperation.create(
                                                                RaftOperation.SYNC_PARTITION_TASK,
                                                                newTask),
                                                        status -> {
                                                            if (!status.isOk()) {
                                                                log.error(
                                                                        "Raft {} addRaftTask " +
                                                                        "error, status is {}",
                                                                        newTask.getPartition()
                                                                               .getId(), status);
                                                            }
                                                        }
                                );
                            }
                        } catch (Exception e) {
                            log.error("Partition {}-{} update task state exception {}",
                                      task.getPartition().getGraphName(),
                                      task.getPartition().getId(), e);
                        }
                        // db might have been destroyed, do not update anymore
                        if (this.started) {
                            taskManager.updateTask(newTask);
                        }
                    } else {
                        log.info("Raft {} doChangeShard not finished", getGroupId());
                    }
                } catch (Exception e) {
                    log.error("Raft {} doChangeShard exception {}", getGroupId(), e);
                } finally {
                    changingPeer.set(false);
                }
            });
        } else {
            // Whether the message has been processed
            if (done != null) {
                done.run(Status.OK());
            }
        }
    }