in cs/src/core/Index/Synchronization/FasterStateMachine.cs [165:288]
private void ThreadStateMachineStep<Input, Output, Context, FasterSession>(
FasterExecutionContext<Input, Output, Context> ctx,
FasterSession fasterSession,
List<ValueTask> valueTasks,
CancellationToken token = default)
where FasterSession : IFasterSession
{
#region Capture current (non-intermediate) system state
var currentTask = currentSyncStateMachine;
var targetState = SystemState.Copy(ref systemState);
SystemState.RemoveIntermediate(ref targetState);
while (currentSyncStateMachine != currentTask)
{
currentTask = currentSyncStateMachine;
targetState = SystemState.Copy(ref systemState);
SystemState.RemoveIntermediate(ref targetState);
}
#endregion
var currentState = ctx == null ? targetState : SystemState.Make(ctx.phase, ctx.version);
var targetStartState = StartOfCurrentCycle(targetState);
#region Get returning thread to start of current cycle, issuing completion callbacks if needed
if (ctx != null)
{
if (ctx.version < targetStartState.Version)
{
// Issue CPR callback for full session
if (ctx.serialNum != -1)
{
List<long> excludedSerialNos = new();
foreach (var v in ctx.ioPendingRequests.Values)
{
excludedSerialNos.Add(v.serialNum);
}
foreach (var v in ctx.retryRequests)
{
excludedSerialNos.Add(v.serialNum);
}
var commitPoint = new CommitPoint
{
UntilSerialNo = ctx.serialNum,
ExcludedSerialNos = excludedSerialNos
};
// Thread local action
fasterSession?.CheckpointCompletionCallback(ctx.guid, commitPoint);
}
}
if ((ctx.version == targetStartState.Version) && (ctx.phase < Phase.REST) && !(ctx.threadStateMachine is IndexSnapshotStateMachine))
{
IssueCompletionCallback(ctx, fasterSession);
}
}
#endregion
// No state machine associated with target, or target is in REST phase:
// we can directly fast forward session to target state
if (currentTask == null || targetState.Phase == Phase.REST)
{
if (ctx != null)
{
ctx.phase = targetState.Phase;
ctx.version = targetState.Version;
ctx.threadStateMachine = currentTask;
}
return;
}
#region Jump on and execute current state machine
// We start at either the start point or our previous position in the state machine.
// If we are calling from somewhere other than an execution thread (e.g. waiting on
// a checkpoint to complete on a client app thread), we start at current system state
var threadState = targetState;
if (ctx != null)
{
if (ctx.threadStateMachine == currentTask)
{
threadState = currentState;
}
else
{
threadState = targetStartState;
ctx.threadStateMachine = currentTask;
}
}
var previousState = threadState;
do
{
Debug.Assert(
(threadState.Version < targetState.Version) ||
(threadState.Version == targetState.Version &&
(threadState.Phase <= targetState.Phase || currentTask is IndexSnapshotStateMachine)
));
currentTask.OnThreadEnteringState(threadState, previousState, this, ctx, fasterSession, valueTasks, token);
if (ctx != null)
{
ctx.phase = threadState.Phase;
ctx.version = threadState.Version;
}
previousState.Word = threadState.Word;
threadState = currentTask.NextState(threadState);
if (systemState.Word != targetState.Word)
{
var tmp = SystemState.Copy(ref systemState);
if (currentSyncStateMachine == currentTask)
{
targetState = tmp;
SystemState.RemoveIntermediate(ref targetState);
}
}
} while (previousState.Word != targetState.Word);
#endregion
return;
}