void SlotMigrator::runMigrationProcess()

in src/cluster/slot_migrate.cc [175:257]


void SlotMigrator::runMigrationProcess() {
  current_stage_ = SlotMigrationStage::kStart;

  while (true) {
    if (isTerminated()) {
      warn("[migrate] Will stop state machine, because the thread was terminated");
      clean();
      return;
    }

    switch (current_stage_) {
      case SlotMigrationStage::kStart: {
        auto s = startMigration();
        if (s.IsOK()) {
          info("[migrate] Succeed to start migrating slot(s) {}", slot_range_.load().String());
          current_stage_ = SlotMigrationStage::kSnapshot;
        } else {
          error("[migrate] Failed to start migrating slot(s) {}. Error: {}", slot_range_.load().String(), s.Msg());
          current_stage_ = SlotMigrationStage::kFailed;
          resumeSyncCtx(s);
        }
        break;
      }
      case SlotMigrationStage::kSnapshot: {
        auto s = sendSnapshot();
        if (s.IsOK()) {
          current_stage_ = SlotMigrationStage::kWAL;
        } else {
          error("[migrate] Failed to send snapshot of slot(s) {}. Error: {}", slot_range_.load().String(), s.Msg());
          current_stage_ = SlotMigrationStage::kFailed;
          resumeSyncCtx(s);
        }
        break;
      }
      case SlotMigrationStage::kWAL: {
        auto s = syncWAL();
        if (s.IsOK()) {
          info("[migrate] Succeed to sync from WAL for slot(s) {}", slot_range_.load().String());
          current_stage_ = SlotMigrationStage::kSuccess;
        } else {
          error("[migrate] Failed to sync from WAL for slot(s) {}. Error: {}", slot_range_.load().String(), s.Msg());
          current_stage_ = SlotMigrationStage::kFailed;
          resumeSyncCtx(s);
        }
        break;
      }
      case SlotMigrationStage::kSuccess: {
        auto s = finishSuccessfulMigration();
        if (s.IsOK()) {
          info("[migrate] Succeed to migrate slot(s) {}", slot_range_.load().String());
          current_stage_ = SlotMigrationStage::kClean;
          migration_state_ = MigrationState::kSuccess;
          resumeSyncCtx(s);
        } else {
          error("[migrate] Failed to finish a successful migration of slot(s) {}. Error: {}",
                slot_range_.load().String(), s.Msg());
          current_stage_ = SlotMigrationStage::kFailed;
          resumeSyncCtx(s);
        }
        break;
      }
      case SlotMigrationStage::kFailed: {
        auto s = finishFailedMigration();
        if (!s.IsOK()) {
          error("[migrate] Failed to finish a failed migration of slot(s) {}. Error: {}", slot_range_.load().String(),
                s.Msg());
        }
        info("[migrate] Failed to migrate a slot(s) {}", slot_range_.load().String());
        migration_state_ = MigrationState::kFailed;
        current_stage_ = SlotMigrationStage::kClean;
        break;
      }
      case SlotMigrationStage::kClean: {
        clean();
        return;
      }
      default:
        error("[migrate] Unexpected state for the state machine: {}", static_cast<int>(current_stage_));
        clean();
        return;
    }
  }
}