in Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledUngroupedAfaPipe_EventList.cs [45:305]
private void ProcessCurrentTimestamp()
{
if (this.currentList.Count == 0) return;
long synctime = this.lastSyncTime;
/* (1) Process currently active states */
bool ended = true;
if (this.activeStatesTraverser.Find())
{
int orig_index;
while (this.activeStatesTraverser.Next(out int index))
{
orig_index = index;
var state = this.activeStates.Values[index];
if (state.PatternStartTimestamp + this.MaxDuration > synctime)
{
#region eventListStateMap
if (this.eventListStateMap != null)
{
var currentStateMap = this.eventListStateMap[state.state];
if (currentStateMap != null)
{
var m = currentStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = currentStateMap[cnt];
if (arcinfo.Fence(synctime, this.currentList, state.register))
{
var newReg = arcinfo.Transfer == null
? state.register
: arcinfo.Transfer(synctime, this.currentList, state.register);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(state.PatternStartTimestamp + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.hash.col[this.iter] = 0;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
if (index == -1) index = this.activeStates.Insert();
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = state.PatternStartTimestamp;
index = -1;
ended = false;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
#region singleEventStateMap
if ((this.singleEventStateMap != null) && (this.currentList.Count == 1))
{
var currentStateMap = this.singleEventStateMap[state.state];
if (currentStateMap != null)
{
var m = currentStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = currentStateMap[cnt];
if (arcinfo.Fence(synctime, this.currentList[0], state.register))
{
var newReg = arcinfo.Transfer == null
? state.register
: arcinfo.Transfer(synctime, this.currentList[0], state.register);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(state.PatternStartTimestamp + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.hash.col[this.iter] = 0;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
if (index == -1) index = this.activeStates.Insert();
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = state.PatternStartTimestamp;
index = -1;
ended = false;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
}
if (index == orig_index) this.activeStatesTraverser.Remove();
if (this.IsDeterministic) break; // We are guaranteed to have only one active state
}
}
/* (2) Start new activations from the start state(s) */
if (this.AllowOverlappingInstances || ended)
{
for (int counter = 0; counter < this.numStartStates; counter++)
{
int startState = this.startStates[counter];
#region eventListStateMap
if (this.eventListStateMap != null)
{
var startStateMap = this.eventListStateMap[startState];
if (startStateMap != null)
{
var m = startStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = startStateMap[cnt];
if (arcinfo.Fence(synctime, this.currentList, this.defaultRegister))
{
var newReg = arcinfo.Transfer == null
? this.defaultRegister
: arcinfo.Transfer(synctime, this.currentList, this.defaultRegister);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(synctime + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.hash.col[this.iter] = 0;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
int index = this.activeStates.Insert();
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = synctime;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
#region singleEventStateMap
if ((this.singleEventStateMap != null) && (this.currentList.Count == 1))
{
var startStateMap = this.singleEventStateMap[startState];
if (startStateMap != null)
{
var m = startStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = startStateMap[cnt];
if (arcinfo.Fence(synctime, this.currentList[0], this.defaultRegister))
{
var newReg = arcinfo.Transfer == null
? this.defaultRegister
: arcinfo.Transfer(synctime, this.currentList[0], this.defaultRegister);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(synctime + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.hash.col[this.iter] = 0;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
int index = this.activeStates.Insert();
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = synctime;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
if (this.IsDeterministic) break; // We are guaranteed to have only one start state
}
}
this.currentList.Clear();
}