in Sources/Core/Microsoft.StreamProcessing/Operators/Afa/CompiledGroupedAfaPipe_MultiEventList.cs [50:538]
private void ProcessCurrentTimestamp()
{
if (this.currentTimestampEventList.Count == 0) return;
long synctime = this.lastSyncTime;
this.allEventListTraverser.currIndex = 0;
while (this.allEventListTraverser.Next(out int el_index, out int el_hash))
{
var currentList = this.currentTimestampEventList.Values[el_index];
List<TPayload> payloadList = null;
/* (1) Process currently active states */
bool ended = true;
if (this.activeFindTraverser.Find(el_hash))
{
int orig_index;
// Track which active states need to be inserted after the current traversal
var newActiveStates = new List<GroupedActiveState<TKey, TRegister>>();
while (this.activeFindTraverser.Next(out int index))
{
orig_index = index;
var state = this.activeStates.Values[index];
if (!this.keyEqualityComparer(state.key, currentList.key)) continue;
if (state.PatternStartTimestamp + this.MaxDuration > synctime)
{
#region multiEventStateMap
if (this.multiEventStateMap != null)
{
var currentStateMap = this.multiEventStateMap[state.state];
if (currentStateMap != null)
{
var m = currentStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = currentStateMap[cnt];
var acc = arcinfo.Initialize(synctime, state.register);
for (int i = 0; i < currentList.payloads.Count; i++)
{
var payload = currentList.payloads[i];
acc = arcinfo.Accumulate(synctime, payload, state.register, acc);
if ((arcinfo.SkipToEnd != null) && arcinfo.SkipToEnd(synctime, payload, acc)) break;
}
if (arcinfo.Fence(synctime, acc, state.register))
{
var newReg = arcinfo.Transfer == null
? state.register
: arcinfo.Transfer(synctime, acc, state.register);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(state.PatternStartTimestamp + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
// Since we will eventually remove this state/index from activeStates, attempt to reuse this index for the outgoing state instead of deleting/re-adding
// If index is already -1, this means we've already reused the state and must allocate/insert a new index for the outgoing state.
if (index != -1)
{
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = state.PatternStartTimestamp;
index = -1;
}
else
{
// Do not attempt to insert directly into activeStates, as that could corrupt the traversal state.
newActiveStates.Add(new GroupedActiveState<TKey, TRegister>
{
key = currentList.key,
state = ns,
register = newReg,
PatternStartTimestamp = state.PatternStartTimestamp,
});
}
ended = false;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
arcinfo.Dispose?.Invoke(acc);
}
}
}
#endregion
#region singleEventStateMap
if ((this.singleEventStateMap != null) && (currentList.payloads.Count == 1))
{
var item = currentList.payloads[0];
var currentStateMap = this.singleEventStateMap[state.state];
if (currentStateMap != null)
{
var m = currentStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = currentStateMap[cnt];
if (arcinfo.Fence(synctime, item, state.register))
{
var newReg = arcinfo.Transfer == null
? state.register
: arcinfo.Transfer(synctime, item, state.register);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(state.PatternStartTimestamp + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
// Since we will eventually remove this state/index from activeStates, attempt to reuse this index for the outgoing state instead of deleting/re-adding
// If index is already -1, this means we've already reused the state and must allocate/insert a new index for the outgoing state.
if (index != -1)
{
if (index == -1) index = this.activeStates.Insert(el_hash);
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = state.PatternStartTimestamp;
index = -1;
}
else
{
// Do not attempt to insert directly into activeStates, as that could corrupt the traversal state.
newActiveStates.Add(new GroupedActiveState<TKey, TRegister>
{
key = currentList.key,
state = ns,
register = newReg,
PatternStartTimestamp = state.PatternStartTimestamp,
});
}
ended = false;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
#region eventListStateMap
if (this.eventListStateMap != null)
{
var currentStateMap = this.eventListStateMap[state.state];
if (currentStateMap != null)
{
if (payloadList == null)
{
payloadList = new List<TPayload>(currentList.payloads.Count);
for (int i = 0; i < currentList.payloads.Count; i++)
{
var payload = currentList.payloads[i];
payloadList.Add(payload);
}
}
var m = currentStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = currentStateMap[cnt];
if (arcinfo.Fence(synctime, payloadList, state.register))
{
var newReg = arcinfo.Transfer == null
? state.register
: arcinfo.Transfer(synctime, payloadList, state.register);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(state.PatternStartTimestamp + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
// Since we will eventually remove this state/index from activeStates, attempt to reuse this index for the outgoing state instead of deleting/re-adding
// If index is already -1, this means we've already reused the state and must allocate/insert a new index for the outgoing state.
if (index != -1)
{
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = state.PatternStartTimestamp;
index = -1;
}
else
{
// Do not attempt to insert directly into activeStates, as that could corrupt the traversal state.
newActiveStates.Add(new GroupedActiveState<TKey, TRegister>
{
key = currentList.key,
state = ns,
register = newReg,
PatternStartTimestamp = state.PatternStartTimestamp,
});
}
ended = false;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
}
if (index == orig_index) this.activeFindTraverser.Remove();
if (this.IsDeterministic) break; // We are guaranteed to have only one active state
}
// Now that we are done traversing the current active states, add any new ones.
foreach (var newActiveState in newActiveStates)
{
this.activeStates.Insert(el_hash, newActiveState);
}
}
/* (2) Start new activations from the start state(s) */
if (!this.AllowOverlappingInstances && !ended) continue;
for (int counter = 0; counter < this.numStartStates; counter++)
{
int startState = this.startStates[counter];
#region multiEventStateMap
if (this.multiEventStateMap != null)
{
var startStateMap = this.multiEventStateMap[startState];
if (startStateMap != null)
{
var m = startStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = startStateMap[cnt];
var acc = arcinfo.Initialize(synctime, this.defaultRegister);
for (int i = 0; i < currentList.payloads.Count; i++)
{
var payload = currentList.payloads[i];
acc = arcinfo.Accumulate(synctime, payload, this.defaultRegister, acc);
if ((arcinfo.SkipToEnd != null) && arcinfo.SkipToEnd(synctime, payload, acc)) break;
}
if (arcinfo.Fence(synctime, acc, this.defaultRegister))
{
var newReg = arcinfo.Transfer == null
? this.defaultRegister
: arcinfo.Transfer(synctime, acc, this.defaultRegister);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(synctime + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
int index = this.activeStates.Insert(el_hash);
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = synctime;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
arcinfo.Dispose?.Invoke(acc);
}
}
}
#endregion
#region singleEventStateMap
if ((this.singleEventStateMap != null) && (currentList.payloads.Count == 1))
{
var item = currentList.payloads[0];
var startStateMap = this.singleEventStateMap[startState];
if (startStateMap != null)
{
var m = startStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = startStateMap[cnt];
if (arcinfo.Fence(synctime, item, this.defaultRegister))
{
var newReg = arcinfo.Transfer == null
? this.defaultRegister
: arcinfo.Transfer(synctime, item, this.defaultRegister);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(synctime + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
int index = this.activeStates.Insert(el_hash);
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = synctime;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
#region eventListStateMap
if (this.eventListStateMap != null)
{
var startStateMap = this.eventListStateMap[startState];
if (startStateMap != null)
{
if (payloadList == null)
{
payloadList = new List<TPayload>(currentList.payloads.Count);
for (int i = 0; i < currentList.payloads.Count; i++)
{
var payload = currentList.payloads[i];
payloadList.Add(payload);
}
}
var m = startStateMap.Length;
for (int cnt = 0; cnt < m; cnt++)
{
var arcinfo = startStateMap[cnt];
if (arcinfo.Fence(synctime, payloadList, this.defaultRegister))
{
var newReg = arcinfo.Transfer == null
? this.defaultRegister
: arcinfo.Transfer(synctime, payloadList, this.defaultRegister);
int ns = arcinfo.toState;
while (true)
{
if (this.isFinal[ns])
{
this.batch.vsync.col[this.iter] = synctime;
this.batch.vother.col[this.iter] = Math.Min(synctime + this.MaxDuration, StreamEvent.InfinitySyncTime);
this.batch[this.iter] = newReg;
this.batch.key.col[this.iter] = currentList.key;
this.batch.hash.col[this.iter] = el_hash;
this.iter++;
if (this.iter == Config.DataBatchSize) FlushContents();
}
if (this.hasOutgoingArcs[ns])
{
int index = this.activeStates.Insert(el_hash);
this.activeStates.Values[index].key = currentList.key;
this.activeStates.Values[index].state = ns;
this.activeStates.Values[index].register = newReg;
this.activeStates.Values[index].PatternStartTimestamp = synctime;
// Add epsilon arc destinations to stack
if (this.epsilonStateMap == null) break;
if (this.epsilonStateMap[ns] != null)
{
for (int cnt2 = 0; cnt2 < this.epsilonStateMap[ns].Length; cnt2++) this.stack.Push(this.epsilonStateMap[ns][cnt2]);
}
}
if (this.stack.Count == 0) break;
ns = this.stack.Pop();
}
if (this.IsDeterministic) break; // We are guaranteed to have only one successful transition
}
}
}
}
#endregion
if (this.IsDeterministic) break; // We are guaranteed to have only one start state
}
currentList.payloads = null; // Let GC handle this
payloadList = null;
}
this.currentTimestampEventList.Clear();
}