bool FishStore::GlobalMoveToNextState()

in src/core/fishstore.h [2907:3110]


bool FishStore<D, A>::GlobalMoveToNextState(SystemState current_state) {
  SystemState next_state = current_state.GetNextState();
  if(!system_state_.compare_exchange_strong(current_state, next_state)) {
    return false;
  }

  switch(next_state.action) {
  case Action::CheckpointFull:
  case Action::CheckpointIndex:
  case Action::CheckpointHybridLog:
    switch(next_state.phase) {
    case Phase::PREP_INDEX_CHKPT:
      // This case is handled directly inside Checkpoint[Index]().
      assert(false);
      break;
    case Phase::INDEX_CHKPT:
      assert(next_state.action != Action::CheckpointHybridLog);
      // Issue async request for fuzzy checkpoint
      assert(!checkpoint_.failed);
      if(CheckpointFuzzyIndex() != Status::Ok) {
        checkpoint_.failed = true;
      }
      break;
    case Phase::PREPARE:
      // Index checkpoint will never reach this state; and CheckpointHybridLog() will handle this
      // case directly.
      assert(next_state.action == Action::CheckpointFull);
      // INDEX_CHKPT -> PREPARE
      // Get an overestimate for the ofb's tail, after we've finished fuzzy-checkpointing the ofb.
      // (Ensures that recovery won't accidentally reallocate from the ofb.)
      checkpoint_.index_metadata.ofb_count =
        overflow_buckets_allocator_[resize_info_.version].count();
      // Write index meta data on disk
      if(WriteIndexMetadata() != Status::Ok) {
        checkpoint_.failed = true;
      }
      if(checkpoint_.index_persistence_callback) {
        // Notify the host that the index checkpoint has completed.
        checkpoint_.index_persistence_callback(Status::Ok);
      }
      break;
    case Phase::IN_PROGRESS: {
      assert(next_state.action != Action::CheckpointIndex);
      // PREPARE -> IN_PROGRESS
      // Do nothing
      break;
    }
    case Phase::WAIT_PENDING:
      assert(next_state.action != Action::CheckpointIndex);
      // IN_PROGRESS -> WAIT_PENDING
      // Do nothing
      break;
    case Phase::WAIT_FLUSH:
      assert(next_state.action != Action::CheckpointIndex);
      // WAIT_PENDING -> WAIT_FLUSH
      if(fold_over_snapshot) {
        // Move read-only to tail
        Address tail_address = hlog.ShiftReadOnlyToTail();
        // Get final address for CPR
        checkpoint_.log_metadata.final_address = tail_address;
      } else {
        Address tail_address = hlog.GetTailAddress();
        // Get final address for CPR
        checkpoint_.log_metadata.final_address = tail_address;
        checkpoint_.snapshot_file = disk.NewFile(disk.relative_cpr_checkpoint_path(
                                      checkpoint_.hybrid_log_token) + "snapshot.dat");
        if(checkpoint_.snapshot_file.Open(&disk.handler()) != Status::Ok) {
          checkpoint_.failed = true;
        }
        // Flush the log to a snapshot.
        hlog.AsyncFlushPagesToFile(checkpoint_.log_metadata.flushed_address.page(),
                                   checkpoint_.log_metadata.final_address, checkpoint_.snapshot_file,
                                   checkpoint_.flush_pending);
      }
      // Write CPR meta data file
      if(WriteCprMetadata() != Status::Ok) {
        checkpoint_.failed = true;
      }
      break;
    case Phase::PERSISTENCE_CALLBACK:
      assert(next_state.action != Action::CheckpointIndex);
      // WAIT_FLUSH -> PERSISTENCE_CALLBACK
      break;
    case Phase::REST:
      // PERSISTENCE_CALLBACK -> REST or INDEX_CHKPT -> REST
      if(next_state.action != Action::CheckpointIndex) {
        // The checkpoint is done; we can reset the contexts now. (Have to reset contexts before
        // another checkpoint can be started.)
        checkpoint_.CheckpointDone();
        // Free checkpoint locks!
        checkpoint_locks_.Free();
        // Checkpoint is done--no more work for threads to do.
        system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
      } else {
        // Get an overestimate for the ofb's tail, after we've finished fuzzy-checkpointing the
        // ofb. (Ensures that recovery won't accidentally reallocate from the ofb.)
        checkpoint_.index_metadata.ofb_count =
          overflow_buckets_allocator_[resize_info_.version].count();
        // Write index meta data on disk
        if(WriteIndexMetadata() != Status::Ok) {
          checkpoint_.failed = true;
        }
        auto index_persistence_callback = checkpoint_.index_persistence_callback;
        // The checkpoint is done; we can reset the contexts now. (Have to reset contexts before
        // another checkpoint can be started.)
        checkpoint_.CheckpointDone();
        // Checkpoint is done--no more work for threads to do.
        system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
        if(index_persistence_callback) {
          // Notify the host that the index checkpoint has completed.
          index_persistence_callback(Status::Ok);
        }
      }
#ifdef _TIMER
      checkpoint_end = std::chrono::high_resolution_clock::now();
      printf("Checkpoint done in %.6f seconds...\n",
             std::chrono::duration<double>(checkpoint_end - checkpoint_start).count());
#endif
      break;
    default:
      // not reached
      assert(false);
      break;
    }
    break;
  case Action::GC:
    switch(next_state.phase) {
    case Phase::GC_IO_PENDING:
      // This case is handled directly inside ShiftBeginAddress().
      assert(false);
      break;
    case Phase::GC_IN_PROGRESS:
      // GC_IO_PENDING -> GC_IN_PROGRESS
      // Tell the disk to truncate the log.
      hlog.Truncate(gc_.truncate_callback);
      break;
    case Phase::REST:
      // GC_IN_PROGRESS -> REST
      // GC is done--no more work for threads to do.
      if(gc_.complete_callback) {
        gc_.complete_callback();
      }
      system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
      break;
    default:
      // not reached
      assert(false);
      break;
    }
    break;
  case Action::GrowIndex:
    switch(next_state.phase) {
    case Phase::GROW_PREPARE:
      // This case is handled directly inside GrowIndex().
      assert(false);
      break;
    case Phase::GROW_IN_PROGRESS:
      // Swap hash table versions so that all threads will use the new version after populating it.
      resize_info_.version = grow_.new_version;
      break;
    case Phase::REST:
      if(grow_.callback) {
        grow_.callback(state_[grow_.new_version].size());
      }
      system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
      break;
    default:
      // not reached
      assert(false);
      break;
    }
    break;
  case Action::ParserShift:
    switch(next_state.phase) {
    case Phase::PS_PENDING:
      // This case is handled directly inside ApplyParserShift().
      assert(false);
      break;
    case Phase::REST:
      // When finish parsing shifting, apply changes to backup meta.
      // So as to keep the invariant that two metas are identical.
      ParserStateApplyActions(parser_states[1 - system_parser_no_],
                              ps_.actions);
      // Finish parser shift, make the callback and send the safe
      // registration boundary.
      ps_.callback(hlog.GetTailAddress().control());
      // Clear the parser shift utility.
      ps_.actions.clear();
      ps_.callback = nullptr;
      // Set system state back to normal.
      system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
      break;
    default:
      assert(false);
      break;
    }
    break;
  default:
    // not reached
    assert(false);
    break;
  }
  return true;
}