void FasterKv::HandleSpecialPhases()

in cc/src/core/faster.h [2516:2723]


void FasterKv<K, V, D>::HandleSpecialPhases() {
  SystemState final_state = system_state_.load();
  if(final_state.phase == Phase::REST) {
    // Nothing to do; just reset thread context.
    thread_ctx().phase = Phase::REST;
    thread_ctx().version = final_state.version;
    return;
  }
  SystemState previous_state{ final_state.action, thread_ctx().phase, thread_ctx().version };
  do {
    // Identify the transition (currentState -> nextState)
    SystemState current_state = (previous_state == final_state) ? final_state :
                                previous_state.GetNextState();
    switch(current_state.action) {
    case Action::CheckpointFull:
    case Action::CheckpointIndex:
    case Action::CheckpointHybridLog:
      switch(current_state.phase) {
      case Phase::PREP_INDEX_CHKPT:
        assert(current_state.action != Action::CheckpointHybridLog);
        // Both from REST -> PREP_INDEX_CHKPT and PREP_INDEX_CHKPT -> PREP_INDEX_CHKPT
        if(previous_state.phase == Phase::REST) {
          // Thread ack that we're performing a checkpoint.
          if(epoch_.FinishThreadPhase(Phase::PREP_INDEX_CHKPT)) {
            GlobalMoveToNextState(current_state);
          }
        }
        break;
      case Phase::INDEX_CHKPT: {
        assert(current_state.action != Action::CheckpointHybridLog);
        // Both from PREP_INDEX_CHKPT -> INDEX_CHKPT and INDEX_CHKPT -> INDEX_CHKPT
        Status result = CheckpointFuzzyIndexComplete();
        if(result != Status::Pending && result != Status::Ok) {
          checkpoint_.failed = true;
        }
        if(result != Status::Pending) {
          if(current_state.action == Action::CheckpointIndex) {
            // This thread is done now.
            thread_ctx().phase = Phase::REST;
            // Thread ack that it is done.
            if(epoch_.FinishThreadPhase(Phase::INDEX_CHKPT)) {
              GlobalMoveToNextState(current_state);
            }
          } else {
            // Index checkpoint is done; move on to PREPARE phase.
            GlobalMoveToNextState(current_state);
          }
        }
        break;
      }
      case Phase::PREPARE:
        assert(current_state.action != Action::CheckpointIndex);
        // Handle (INDEX_CHKPT -> PREPARE or REST -> PREPARE) and PREPARE -> PREPARE
        if(previous_state.phase != Phase::PREPARE) {
          // mark pending requests
          MarkAllPendingRequests();
          // keep a count of number of threads
          ++checkpoint_.log_metadata.num_threads;
          // set the thread index
          checkpoint_.log_metadata.guids[Thread::id()] = thread_ctx().guid;
          // Thread ack that it has finished marking its pending requests.
          if(epoch_.FinishThreadPhase(Phase::PREPARE)) {
            GlobalMoveToNextState(current_state);
          }
        }
        break;
      case Phase::IN_PROGRESS:
        assert(current_state.action != Action::CheckpointIndex);
        // Handle PREPARE -> IN_PROGRESS and IN_PROGRESS -> IN_PROGRESS
        if(previous_state.phase == Phase::PREPARE) {
          assert(prev_thread_ctx().retry_requests.empty());
          assert(prev_thread_ctx().pending_ios.empty());
          assert(prev_thread_ctx().io_responses.empty());

          // Get a new thread context; keep track of the old one as "previous."
          thread_contexts_[Thread::id()].swap();
          // initialize a new local context
          thread_ctx().Initialize(Phase::IN_PROGRESS, current_state.version,
                                  prev_thread_ctx().guid, prev_thread_ctx().serial_num);
          // Thread ack that it has swapped contexts.
          if(epoch_.FinishThreadPhase(Phase::IN_PROGRESS)) {
            GlobalMoveToNextState(current_state);
          }
        }
        break;
      case Phase::WAIT_PENDING:
        assert(current_state.action != Action::CheckpointIndex);
        // Handle IN_PROGRESS -> WAIT_PENDING and WAIT_PENDING -> WAIT_PENDING
        if(!epoch_.HasThreadFinishedPhase(Phase::WAIT_PENDING)) {
          if(prev_thread_ctx().pending_ios.empty() &&
              prev_thread_ctx().retry_requests.empty()) {
            // Thread ack that it has completed its pending I/Os.
            if(epoch_.FinishThreadPhase(Phase::WAIT_PENDING)) {
              GlobalMoveToNextState(current_state);
            }
          }
        }
        break;
      case Phase::WAIT_FLUSH:
        assert(current_state.action != Action::CheckpointIndex);
        // Handle WAIT_PENDING -> WAIT_FLUSH and WAIT_FLUSH -> WAIT_FLUSH
        if(!epoch_.HasThreadFinishedPhase(Phase::WAIT_FLUSH)) {
          bool flushed;
          if(fold_over_snapshot) {
            flushed = hlog.flushed_until_address.load() >= checkpoint_.log_metadata.final_address;
          } else {
            flushed = checkpoint_.flush_pending.load() == 0;
          }
          if(flushed) {
            // write context info
            WriteCprContext();
            // Thread ack that it has written its CPU context.
            if(epoch_.FinishThreadPhase(Phase::WAIT_FLUSH)) {
              GlobalMoveToNextState(current_state);
            }
          }
        }
        break;
      case Phase::PERSISTENCE_CALLBACK:
        assert(current_state.action != Action::CheckpointIndex);
        // Handle WAIT_FLUSH -> PERSISTENCE_CALLBACK and PERSISTENCE_CALLBACK -> PERSISTENCE_CALLBACK
        if(previous_state.phase == Phase::WAIT_FLUSH) {
          // Persistence callback
          if(checkpoint_.hybrid_log_persistence_callback) {
            checkpoint_.hybrid_log_persistence_callback(Status::Ok, prev_thread_ctx().serial_num);
          }
          // Thread has finished checkpointing.
          thread_ctx().phase = Phase::REST;
          // Thread ack that it has finished checkpointing.
          if(epoch_.FinishThreadPhase(Phase::PERSISTENCE_CALLBACK)) {
            GlobalMoveToNextState(current_state);
          }
        }
        break;
      default:
        // nothing to do.
        break;
      }
      break;
    case Action::GC:
      switch(current_state.phase) {
      case Phase::GC_IO_PENDING:
        // Handle REST -> GC_IO_PENDING and GC_IO_PENDING -> GC_IO_PENDING.
        if(previous_state.phase == Phase::REST) {
          assert(prev_thread_ctx().retry_requests.empty());
          assert(prev_thread_ctx().pending_ios.empty());
          assert(prev_thread_ctx().io_responses.empty());
          // Get a new thread context; keep track of the old one as "previous."
          thread_contexts_[Thread::id()].swap();
          // initialize a new local context
          thread_ctx().Initialize(Phase::GC_IO_PENDING, current_state.version,
                                  prev_thread_ctx().guid, prev_thread_ctx().serial_num);
        }

        // See if the old thread context has completed its pending I/Os.
        if(!epoch_.HasThreadFinishedPhase(Phase::GC_IO_PENDING)) {
          if(prev_thread_ctx().pending_ios.empty() &&
              prev_thread_ctx().retry_requests.empty()) {
            // Thread ack that it has completed its pending I/Os.
            if(epoch_.FinishThreadPhase(Phase::GC_IO_PENDING)) {
              GlobalMoveToNextState(current_state);
            }
          }
        }
        break;
      case Phase::GC_IN_PROGRESS:
        // Handle GC_IO_PENDING -> GC_IN_PROGRESS and GC_IN_PROGRESS -> GC_IN_PROGRESS.
        if(!epoch_.HasThreadFinishedPhase(Phase::GC_IN_PROGRESS)) {
          if(!CleanHashTableBuckets()) {
            // No more buckets for this thread to clean; thread has finished GC.
            thread_ctx().phase = Phase::REST;
            // Thread ack that it has finished GC.
            if(epoch_.FinishThreadPhase(Phase::GC_IN_PROGRESS)) {
              GlobalMoveToNextState(current_state);
            }
          }
        }
        break;
      default:
        assert(false); // not reached
        break;
      }
      break;
    case Action::GrowIndex:
      switch(current_state.phase) {
      case Phase::GROW_PREPARE:
        if(previous_state.phase == Phase::REST) {
          // Thread ack that we're going to grow the hash table.
          if(epoch_.FinishThreadPhase(Phase::GROW_PREPARE)) {
            GlobalMoveToNextState(current_state);
          }
        } else {
          // Wait for all other threads to finish their outstanding (synchronous) hash table
          // operations.
          std::this_thread::yield();
        }
        break;
      case Phase::GROW_IN_PROGRESS:
        SplitHashTableBuckets();
        break;
      }
      break;
    }
    thread_ctx().phase = current_state.phase;
    thread_ctx().version = current_state.version;
    previous_state = current_state;
  } while(previous_state != final_state);
}