in Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledPartitionedAfaPipe_EventList.cs [44:358]
private void ProcessCurrentTimestamp(int partitionIndex)
{
if (this.currentTimestampEventList.Count == 0) return;
long synctime = this.lastSyncTime.entries[partitionIndex].value;
var allEventListTraverser = new FastMap<SavedEventList<TKey, TPayload>>.VisibleTraverser(this.currentTimestampEventList.entries[partitionIndex].value)
{
currIndex = 0
};
while (allEventListTraverser.Next(out int el_index, out int el_hash))
{
var currentList = this.currentTimestampEventList.entries[partitionIndex].value.Values[el_index];
/* (1) Process currently active states */
bool ended = true;
if (this.activeFindTraverser.Find(el_hash))
{
int orig_index;
// Track which active states need to be inserted after the current traversal
var newActiveStates = new List<GroupedActiveState<TKey, TRegister>>();
while (this.activeFindTraverser.Next(out int index))
{
orig_index = index;
var state = this.activeStates.Values[index];
if (!this.keyEqualityComparer(state.key, currentList.key)) continue;
if (state.PatternStartTimestamp + this.MaxDuration > synctime)
{
#region eventListStateMap
if (this.eventListStateMap != null)
{
var currentStateMap = this.eventListStateMap[state.state];
if (currentStateMap != null)
{
var m = currentStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = currentStateMap[cnt];
if (arcinfo.Fence(synctime, currentList.payloads, state.register))
{
var newReg = arcinfo.Transfer == null
? state.register
: arcinfo.Transfer(synctime, currentList.payloads, state.register);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(state.PatternStartTimestamp + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
// Since we will eventually remove this state/index from activeStates, attempt to reuse this index for the outgoing state instead of deleting/re-adding
// If index is already -1, this means we've already reused the state and must allocate/insert a new index for the outgoing state.
if (index != -1)
{
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = state.PatternStartTimestamp;
index = -1;
}
else
{
// Do not attempt to insert directly into activeStates, as that could corrupt the traversal state.
newActiveStates.Add(new GroupedActiveState<TKey, TRegister>
{
key = currentList.key,
state = ns,
register = newReg,
PatternStartTimestamp = state.PatternStartTimestamp,
});
}
ended = false;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
#region singleEventStateMap
if ((this.singleEventStateMap != null) && (currentList.payloads.Count == 1))
{
var currentStateMap = this.singleEventStateMap[state.state];
if (currentStateMap != null)
{
var m = currentStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = currentStateMap[cnt];
if (arcinfo.Fence(synctime, currentList.payloads[0], state.register))
{
var newReg = arcinfo.Transfer == null
? state.register
: arcinfo.Transfer(synctime, currentList.payloads[0], state.register);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(state.PatternStartTimestamp + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
// Since we will eventually remove this state/index from activeStates, attempt to reuse this index for the outgoing state instead of deleting/re-adding
// If index is already -1, this means we've already reused the state and must allocate/insert a new index for the outgoing state.
if (index != -1)
{
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = state.PatternStartTimestamp;
index = -1;
}
else
{
// Do not attempt to insert directly into activeStates, as that could corrupt the traversal state.
newActiveStates.Add(new GroupedActiveState<TKey, TRegister>
{
key = currentList.key,
state = ns,
register = newReg,
PatternStartTimestamp = state.PatternStartTimestamp,
});
}
ended = false;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
}
if (index == orig_index) this.activeFindTraverser.Remove();
if (this.IsDeterministic) break; // We are guaranteed to have only one active state
}
// Now that we are done traversing the current active states, add any new ones.
foreach (var newActiveState in newActiveStates)
{
this.activeStates.Insert(el_hash, newActiveState);
}
}
/* (2) Start new activations from the start state(s) */
if (!this.AllowOverlappingInstances && !ended) continue;
for (int counter = 0; counter < this.numStartStates; counter++)
{
int startState = this.startStates[counter];
#region eventListStateMap
if (this.eventListStateMap != null)
{
var startStateMap = this.eventListStateMap[startState];
if (startStateMap != null)
{
var m = startStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = startStateMap[cnt];
if (arcinfo.Fence(synctime, currentList.payloads, this.defaultRegister))
{
var newReg = arcinfo.Transfer == null
? this.defaultRegister
: arcinfo.Transfer(synctime, currentList.payloads, this.defaultRegister);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(synctime + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
int index = this.activeStates.Insert(el_hash);
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = synctime;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
#region singleEventStateMap
if ((this.singleEventStateMap != null) && (currentList.payloads.Count == 1))
{
var startStateMap = this.singleEventStateMap[startState];
if (startStateMap != null)
{
var m = startStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = startStateMap[cnt];
if (arcinfo.Fence(synctime, currentList.payloads[0], this.defaultRegister))
{
var newReg = arcinfo.Transfer == null
? this.defaultRegister
: arcinfo.Transfer(synctime, currentList.payloads[0], this.defaultRegister);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(synctime + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
int index = this.activeStates.Insert(el_hash);
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = synctime;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
if (this.IsDeterministic) break; // We are guaranteed to have only one start state
}
}
this.currentTimestampEventList.entries[partitionIndex].value.Clear();
}