in cc/src/core/faster.h [2320:2493]
bool FasterKv<K, V, D>::GlobalMoveToNextState(SystemState current_state) {
SystemState next_state = current_state.GetNextState();
if(!system_state_.compare_exchange_strong(current_state, next_state)) {
return false;
}
switch(next_state.action) {
case Action::CheckpointFull:
case Action::CheckpointIndex:
case Action::CheckpointHybridLog:
switch(next_state.phase) {
case Phase::PREP_INDEX_CHKPT:
// This case is handled directly inside Checkpoint[Index]().
assert(false);
break;
case Phase::INDEX_CHKPT:
assert(next_state.action != Action::CheckpointHybridLog);
// Issue async request for fuzzy checkpoint
assert(!checkpoint_.failed);
if(CheckpointFuzzyIndex() != Status::Ok) {
checkpoint_.failed = true;
}
break;
case Phase::PREPARE:
// Index checkpoint will never reach this state; and CheckpointHybridLog() will handle this
// case directly.
assert(next_state.action == Action::CheckpointFull);
// INDEX_CHKPT -> PREPARE
// Get an overestimate for the ofb's tail, after we've finished fuzzy-checkpointing the ofb.
// (Ensures that recovery won't accidentally reallocate from the ofb.)
checkpoint_.index_metadata.ofb_count =
overflow_buckets_allocator_[resize_info_.version].count();
// Write index meta data on disk
if(WriteIndexMetadata() != Status::Ok) {
checkpoint_.failed = true;
}
if(checkpoint_.index_persistence_callback) {
// Notify the host that the index checkpoint has completed.
checkpoint_.index_persistence_callback(Status::Ok);
}
break;
case Phase::IN_PROGRESS: {
assert(next_state.action != Action::CheckpointIndex);
// PREPARE -> IN_PROGRESS
// Do nothing
break;
}
case Phase::WAIT_PENDING:
assert(next_state.action != Action::CheckpointIndex);
// IN_PROGRESS -> WAIT_PENDING
// Do nothing
break;
case Phase::WAIT_FLUSH:
assert(next_state.action != Action::CheckpointIndex);
// WAIT_PENDING -> WAIT_FLUSH
if(fold_over_snapshot) {
// Move read-only to tail
Address tail_address = hlog.ShiftReadOnlyToTail();
// Get final address for CPR
checkpoint_.log_metadata.final_address = tail_address;
} else {
Address tail_address = hlog.GetTailAddress();
// Get final address for CPR
checkpoint_.log_metadata.final_address = tail_address;
checkpoint_.snapshot_file = disk.NewFile(disk.relative_cpr_checkpoint_path(
checkpoint_.hybrid_log_token) + "snapshot.dat");
if(checkpoint_.snapshot_file.Open(&disk.handler()) != Status::Ok) {
checkpoint_.failed = true;
}
// Flush the log to a snapshot.
hlog.AsyncFlushPagesToFile(checkpoint_.log_metadata.flushed_address.page(),
checkpoint_.log_metadata.final_address, checkpoint_.snapshot_file,
checkpoint_.flush_pending);
}
// Write CPR meta data file
if(WriteCprMetadata() != Status::Ok) {
checkpoint_.failed = true;
}
break;
case Phase::PERSISTENCE_CALLBACK:
assert(next_state.action != Action::CheckpointIndex);
// WAIT_FLUSH -> PERSISTENCE_CALLBACK
break;
case Phase::REST:
// PERSISTENCE_CALLBACK -> REST or INDEX_CHKPT -> REST
if(next_state.action != Action::CheckpointIndex) {
// The checkpoint is done; we can reset the contexts now. (Have to reset contexts before
// another checkpoint can be started.)
checkpoint_.CheckpointDone();
// Free checkpoint locks!
checkpoint_locks_.Free();
// Checkpoint is done--no more work for threads to do.
system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
} else {
// Get an overestimate for the ofb's tail, after we've finished fuzzy-checkpointing the
// ofb. (Ensures that recovery won't accidentally reallocate from the ofb.)
checkpoint_.index_metadata.ofb_count =
overflow_buckets_allocator_[resize_info_.version].count();
// Write index meta data on disk
if(WriteIndexMetadata() != Status::Ok) {
checkpoint_.failed = true;
}
auto index_persistence_callback = checkpoint_.index_persistence_callback;
// The checkpoint is done; we can reset the contexts now. (Have to reset contexts before
// another checkpoint can be started.)
checkpoint_.CheckpointDone();
// Checkpoint is done--no more work for threads to do.
system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
if(index_persistence_callback) {
// Notify the host that the index checkpoint has completed.
index_persistence_callback(Status::Ok);
}
}
break;
default:
// not reached
assert(false);
break;
}
break;
case Action::GC:
switch(next_state.phase) {
case Phase::GC_IO_PENDING:
// This case is handled directly inside ShiftBeginAddress().
assert(false);
break;
case Phase::GC_IN_PROGRESS:
// GC_IO_PENDING -> GC_IN_PROGRESS
// Tell the disk to truncate the log.
hlog.Truncate(gc_.truncate_callback);
break;
case Phase::REST:
// GC_IN_PROGRESS -> REST
// GC is done--no more work for threads to do.
if(gc_.complete_callback) {
gc_.complete_callback();
}
system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
break;
default:
// not reached
assert(false);
break;
}
break;
case Action::GrowIndex:
switch(next_state.phase) {
case Phase::GROW_PREPARE:
// This case is handled directly inside GrowIndex().
assert(false);
break;
case Phase::GROW_IN_PROGRESS:
// Swap hash table versions so that all threads will use the new version after populating it.
resize_info_.version = grow_.new_version;
break;
case Phase::REST:
if(grow_.callback) {
grow_.callback(state_[grow_.new_version].size());
}
system_state_.store(SystemState{ Action::None, Phase::REST, next_state.version });
break;
default:
// not reached
assert(false);
break;
}
break;
default:
// not reached
assert(false);
break;
}
return true;
}