in hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/PartitionEngine.java [740:838]
public void doChangeShard(final MetaTask.Task task, Closure done) {
if (!isLeader()) {
return;
}
log.info("Raft {} doChangeShard task is {}", getGroupId(), task);
// If the same partition has the same task executing, ignore task execution.
if (taskManager.partitionTaskRepeat(task.getPartition().getId(),
task.getPartition().getGraphName(),
task.getType().name())) {
log.error("Raft {} doChangeShard task repeat, type:{}", getGroupId(), task.getType());
return;
}
// Task not completed, repeat execution.
if (task.getState().getNumber() < MetaTask.TaskState.Task_Stop_VALUE && isLeader()) {
Utils.runInThread(() -> {
try {
// cannot changePeers in the state machine
List<String> peers =
partitionManager.shards2Peers(task.getChangeShard().getShardList());
HashSet<String> hashSet = new HashSet<>(peers);
// Task has the same peers, indicating there is an error in the task itself, task ignored
if (peers.size() != hashSet.size()) {
log.info("Raft {} doChangeShard peer is repeat, peers: {}", getGroupId(),
peers);
}
Status result;
if (changingPeer.compareAndSet(false, true)) {
result = this.changePeers(peers, done);
} else {
result = HgRaftError.TASK_ERROR.toStatus();
}
if (result.getCode() != HgRaftError.TASK_CONTINUE.getNumber()) {
log.info("Raft {} doChangeShard is finished, status is {}", getGroupId(),
result);
// Task completed, synchronize task status
MetaTask.Task newTask;
if (result.isOk()) {
newTask = task.toBuilder().setState(MetaTask.TaskState.Task_Success)
.build();
} else {
log.warn(
"Raft {} doChangeShard is failure, need to retry, status is {}",
getGroupId(), result);
try {
// Reduce send times
Thread.sleep(1000);
} catch (Exception e) {
log.error("wait 1s to resend retry task. got error:{}",
e.getMessage());
}
newTask = task.toBuilder().setState(MetaTask.TaskState.Task_Ready)
.build();
}
try {
// During the waiting process, it may have already shut down.
if (isLeader()) {
storeEngine.addRaftTask(newTask.getPartition().getGraphName(),
newTask.getPartition().getId(),
RaftOperation.create(
RaftOperation.SYNC_PARTITION_TASK,
newTask),
status -> {
if (!status.isOk()) {
log.error(
"Raft {} addRaftTask " +
"error, status is {}",
newTask.getPartition()
.getId(), status);
}
}
);
}
} catch (Exception e) {
log.error("Partition {}-{} update task state exception {}",
task.getPartition().getGraphName(),
task.getPartition().getId(), e);
}
// db might have been destroyed, do not update anymore
if (this.started) {
taskManager.updateTask(newTask);
}
} else {
log.info("Raft {} doChangeShard not finished", getGroupId());
}
} catch (Exception e) {
log.error("Raft {} doChangeShard exception {}", getGroupId(), e);
} finally {
changingPeer.set(false);
}
});
} else {
// Whether the message has been processed
if (done != null) {
done.run(Status.OK());
}
}
}