in src/cluster/slot_migrate.cc [175:257]
void SlotMigrator::runMigrationProcess() {
current_stage_ = SlotMigrationStage::kStart;
while (true) {
if (isTerminated()) {
warn("[migrate] Will stop state machine, because the thread was terminated");
clean();
return;
}
switch (current_stage_) {
case SlotMigrationStage::kStart: {
auto s = startMigration();
if (s.IsOK()) {
info("[migrate] Succeed to start migrating slot(s) {}", slot_range_.load().String());
current_stage_ = SlotMigrationStage::kSnapshot;
} else {
error("[migrate] Failed to start migrating slot(s) {}. Error: {}", slot_range_.load().String(), s.Msg());
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
case SlotMigrationStage::kSnapshot: {
auto s = sendSnapshot();
if (s.IsOK()) {
current_stage_ = SlotMigrationStage::kWAL;
} else {
error("[migrate] Failed to send snapshot of slot(s) {}. Error: {}", slot_range_.load().String(), s.Msg());
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
case SlotMigrationStage::kWAL: {
auto s = syncWAL();
if (s.IsOK()) {
info("[migrate] Succeed to sync from WAL for slot(s) {}", slot_range_.load().String());
current_stage_ = SlotMigrationStage::kSuccess;
} else {
error("[migrate] Failed to sync from WAL for slot(s) {}. Error: {}", slot_range_.load().String(), s.Msg());
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
case SlotMigrationStage::kSuccess: {
auto s = finishSuccessfulMigration();
if (s.IsOK()) {
info("[migrate] Succeed to migrate slot(s) {}", slot_range_.load().String());
current_stage_ = SlotMigrationStage::kClean;
migration_state_ = MigrationState::kSuccess;
resumeSyncCtx(s);
} else {
error("[migrate] Failed to finish a successful migration of slot(s) {}. Error: {}",
slot_range_.load().String(), s.Msg());
current_stage_ = SlotMigrationStage::kFailed;
resumeSyncCtx(s);
}
break;
}
case SlotMigrationStage::kFailed: {
auto s = finishFailedMigration();
if (!s.IsOK()) {
error("[migrate] Failed to finish a failed migration of slot(s) {}. Error: {}", slot_range_.load().String(),
s.Msg());
}
info("[migrate] Failed to migrate a slot(s) {}", slot_range_.load().String());
migration_state_ = MigrationState::kFailed;
current_stage_ = SlotMigrationStage::kClean;
break;
}
case SlotMigrationStage::kClean: {
clean();
return;
}
default:
error("[migrate] Unexpected state for the state machine: {}", static_cast<int>(current_stage_));
clean();
return;
}
}
}