in cc/src/core/faster.h [2516:2723]
void FasterKv<K, V, D>::HandleSpecialPhases() {
SystemState final_state = system_state_.load();
if(final_state.phase == Phase::REST) {
// Nothing to do; just reset thread context.
thread_ctx().phase = Phase::REST;
thread_ctx().version = final_state.version;
return;
}
SystemState previous_state{ final_state.action, thread_ctx().phase, thread_ctx().version };
do {
// Identify the transition (currentState -> nextState)
SystemState current_state = (previous_state == final_state) ? final_state :
previous_state.GetNextState();
switch(current_state.action) {
case Action::CheckpointFull:
case Action::CheckpointIndex:
case Action::CheckpointHybridLog:
switch(current_state.phase) {
case Phase::PREP_INDEX_CHKPT:
assert(current_state.action != Action::CheckpointHybridLog);
// Both from REST -> PREP_INDEX_CHKPT and PREP_INDEX_CHKPT -> PREP_INDEX_CHKPT
if(previous_state.phase == Phase::REST) {
// Thread ack that we're performing a checkpoint.
if(epoch_.FinishThreadPhase(Phase::PREP_INDEX_CHKPT)) {
GlobalMoveToNextState(current_state);
}
}
break;
case Phase::INDEX_CHKPT: {
assert(current_state.action != Action::CheckpointHybridLog);
// Both from PREP_INDEX_CHKPT -> INDEX_CHKPT and INDEX_CHKPT -> INDEX_CHKPT
Status result = CheckpointFuzzyIndexComplete();
if(result != Status::Pending && result != Status::Ok) {
checkpoint_.failed = true;
}
if(result != Status::Pending) {
if(current_state.action == Action::CheckpointIndex) {
// This thread is done now.
thread_ctx().phase = Phase::REST;
// Thread ack that it is done.
if(epoch_.FinishThreadPhase(Phase::INDEX_CHKPT)) {
GlobalMoveToNextState(current_state);
}
} else {
// Index checkpoint is done; move on to PREPARE phase.
GlobalMoveToNextState(current_state);
}
}
break;
}
case Phase::PREPARE:
assert(current_state.action != Action::CheckpointIndex);
// Handle (INDEX_CHKPT -> PREPARE or REST -> PREPARE) and PREPARE -> PREPARE
if(previous_state.phase != Phase::PREPARE) {
// mark pending requests
MarkAllPendingRequests();
// keep a count of number of threads
++checkpoint_.log_metadata.num_threads;
// set the thread index
checkpoint_.log_metadata.guids[Thread::id()] = thread_ctx().guid;
// Thread ack that it has finished marking its pending requests.
if(epoch_.FinishThreadPhase(Phase::PREPARE)) {
GlobalMoveToNextState(current_state);
}
}
break;
case Phase::IN_PROGRESS:
assert(current_state.action != Action::CheckpointIndex);
// Handle PREPARE -> IN_PROGRESS and IN_PROGRESS -> IN_PROGRESS
if(previous_state.phase == Phase::PREPARE) {
assert(prev_thread_ctx().retry_requests.empty());
assert(prev_thread_ctx().pending_ios.empty());
assert(prev_thread_ctx().io_responses.empty());
// Get a new thread context; keep track of the old one as "previous."
thread_contexts_[Thread::id()].swap();
// initialize a new local context
thread_ctx().Initialize(Phase::IN_PROGRESS, current_state.version,
prev_thread_ctx().guid, prev_thread_ctx().serial_num);
// Thread ack that it has swapped contexts.
if(epoch_.FinishThreadPhase(Phase::IN_PROGRESS)) {
GlobalMoveToNextState(current_state);
}
}
break;
case Phase::WAIT_PENDING:
assert(current_state.action != Action::CheckpointIndex);
// Handle IN_PROGRESS -> WAIT_PENDING and WAIT_PENDING -> WAIT_PENDING
if(!epoch_.HasThreadFinishedPhase(Phase::WAIT_PENDING)) {
if(prev_thread_ctx().pending_ios.empty() &&
prev_thread_ctx().retry_requests.empty()) {
// Thread ack that it has completed its pending I/Os.
if(epoch_.FinishThreadPhase(Phase::WAIT_PENDING)) {
GlobalMoveToNextState(current_state);
}
}
}
break;
case Phase::WAIT_FLUSH:
assert(current_state.action != Action::CheckpointIndex);
// Handle WAIT_PENDING -> WAIT_FLUSH and WAIT_FLUSH -> WAIT_FLUSH
if(!epoch_.HasThreadFinishedPhase(Phase::WAIT_FLUSH)) {
bool flushed;
if(fold_over_snapshot) {
flushed = hlog.flushed_until_address.load() >= checkpoint_.log_metadata.final_address;
} else {
flushed = checkpoint_.flush_pending.load() == 0;
}
if(flushed) {
// write context info
WriteCprContext();
// Thread ack that it has written its CPU context.
if(epoch_.FinishThreadPhase(Phase::WAIT_FLUSH)) {
GlobalMoveToNextState(current_state);
}
}
}
break;
case Phase::PERSISTENCE_CALLBACK:
assert(current_state.action != Action::CheckpointIndex);
// Handle WAIT_FLUSH -> PERSISTENCE_CALLBACK and PERSISTENCE_CALLBACK -> PERSISTENCE_CALLBACK
if(previous_state.phase == Phase::WAIT_FLUSH) {
// Persistence callback
if(checkpoint_.hybrid_log_persistence_callback) {
checkpoint_.hybrid_log_persistence_callback(Status::Ok, prev_thread_ctx().serial_num);
}
// Thread has finished checkpointing.
thread_ctx().phase = Phase::REST;
// Thread ack that it has finished checkpointing.
if(epoch_.FinishThreadPhase(Phase::PERSISTENCE_CALLBACK)) {
GlobalMoveToNextState(current_state);
}
}
break;
default:
// nothing to do.
break;
}
break;
case Action::GC:
switch(current_state.phase) {
case Phase::GC_IO_PENDING:
// Handle REST -> GC_IO_PENDING and GC_IO_PENDING -> GC_IO_PENDING.
if(previous_state.phase == Phase::REST) {
assert(prev_thread_ctx().retry_requests.empty());
assert(prev_thread_ctx().pending_ios.empty());
assert(prev_thread_ctx().io_responses.empty());
// Get a new thread context; keep track of the old one as "previous."
thread_contexts_[Thread::id()].swap();
// initialize a new local context
thread_ctx().Initialize(Phase::GC_IO_PENDING, current_state.version,
prev_thread_ctx().guid, prev_thread_ctx().serial_num);
}
// See if the old thread context has completed its pending I/Os.
if(!epoch_.HasThreadFinishedPhase(Phase::GC_IO_PENDING)) {
if(prev_thread_ctx().pending_ios.empty() &&
prev_thread_ctx().retry_requests.empty()) {
// Thread ack that it has completed its pending I/Os.
if(epoch_.FinishThreadPhase(Phase::GC_IO_PENDING)) {
GlobalMoveToNextState(current_state);
}
}
}
break;
case Phase::GC_IN_PROGRESS:
// Handle GC_IO_PENDING -> GC_IN_PROGRESS and GC_IN_PROGRESS -> GC_IN_PROGRESS.
if(!epoch_.HasThreadFinishedPhase(Phase::GC_IN_PROGRESS)) {
if(!CleanHashTableBuckets()) {
// No more buckets for this thread to clean; thread has finished GC.
thread_ctx().phase = Phase::REST;
// Thread ack that it has finished GC.
if(epoch_.FinishThreadPhase(Phase::GC_IN_PROGRESS)) {
GlobalMoveToNextState(current_state);
}
}
}
break;
default:
assert(false); // not reached
break;
}
break;
case Action::GrowIndex:
switch(current_state.phase) {
case Phase::GROW_PREPARE:
if(previous_state.phase == Phase::REST) {
// Thread ack that we're going to grow the hash table.
if(epoch_.FinishThreadPhase(Phase::GROW_PREPARE)) {
GlobalMoveToNextState(current_state);
}
} else {
// Wait for all other threads to finish their outstanding (synchronous) hash table
// operations.
std::this_thread::yield();
}
break;
case Phase::GROW_IN_PROGRESS:
SplitHashTableBuckets();
break;
}
break;
}
thread_ctx().phase = current_state.phase;
thread_ctx().version = current_state.version;
previous_state = current_state;
} while(previous_state != final_state);
}