in Sources/Core/Microsoft.StreamProcessing/Operators/EquiJoin/StartEdge/PartitionedStartEdgeEquiJoinPipe.cs [220:417]
private void ProcessPendingEntries()
{
foreach (var pKey in this.processQueue)
{
// Partition is no longer clean if we are processing it. If it is still clean, it will be added below.
this.cleanKeys.Remove(pKey);
PooledElasticCircularBuffer<LEntry> leftWorking = null;
PooledElasticCircularBuffer<REntry> rightWorking = null;
this.leftQueue.Lookup(pKey, out int index);
leftWorking = this.leftQueue.entries[index].value;
rightWorking = this.rightQueue.entries[index].value;
FastMap<ActiveEvent<TLeft>> leftEdgeMapForPartition = null;
FastMap<ActiveEvent<TRight>> rightEdgeMapForPartition = null;
this.partitionData.Lookup(pKey, out index);
var partition = this.partitionData.entries[index].value;
leftEdgeMapForPartition = partition.leftEdgeMap;
rightEdgeMapForPartition = partition.rightEdgeMap;
while (true)
{
bool hasLeftBatch = leftWorking.TryPeekFirst(out LEntry leftEntry);
bool hasRightBatch = rightWorking.TryPeekFirst(out REntry rightEntry);
FastMap<ActiveEvent<TRight>>.FindTraverser rightEdges = default;
FastMap<ActiveEvent<TLeft>>.FindTraverser leftEdges = default;
if (hasLeftBatch && hasRightBatch)
{
UpdateNextLeftTime(partition, leftEntry.Sync);
UpdateNextRightTime(partition, rightEntry.Sync);
if (partition.nextLeftTime <= partition.nextRightTime)
{
if (leftEntry.Other != long.MinValue)
{
TKey key = leftEntry.Key;
var hash = leftEntry.Hash;
if (rightEdgeMapForPartition.Find(hash, ref rightEdges))
{
while (rightEdges.Next(out int rightIndex))
{
if (this.keyComparer(key, rightEdgeMapForPartition.Values[rightIndex].Key))
{
OutputStartEdge(partition.nextLeftTime, ref key, ref leftEntry.Payload, ref rightEdgeMapForPartition.Values[rightIndex].Payload, hash);
}
}
}
if (!partition.isRightComplete)
{
int newIndex = leftEdgeMapForPartition.Insert(hash);
leftEdgeMapForPartition.Values[newIndex].Populate(ref key, ref leftEntry.Payload);
}
UpdateNextLeftTime(partition, leftEntry.Sync);
}
else
{
OutputPunctuation(leftEntry.Sync, ref leftEntry.Key, leftEntry.Hash);
}
leftWorking.Dequeue();
}
else
{
if (rightEntry.Other != long.MinValue)
{
TKey key = rightEntry.Key;
var hash = rightEntry.Hash;
if (leftEdgeMapForPartition.Find(hash, ref leftEdges))
{
while (leftEdges.Next(out int leftIndex))
{
if (this.keyComparer(key, leftEdgeMapForPartition.Values[leftIndex].Key))
{
OutputStartEdge(partition.nextRightTime, ref key, ref leftEdgeMapForPartition.Values[leftIndex].Payload, ref rightEntry.Payload, hash);
}
}
}
if (!partition.isLeftComplete)
{
int newIndex = rightEdgeMapForPartition.Insert(hash);
rightEdgeMapForPartition.Values[newIndex].Populate(ref key, ref rightEntry.Payload);
}
UpdateNextRightTime(partition, rightEntry.Sync);
}
else
{
OutputPunctuation(rightEntry.Sync, ref rightEntry.Key, rightEntry.Hash);
}
rightWorking.Dequeue();
}
}
else if (hasLeftBatch)
{
UpdateNextLeftTime(partition, leftEntry.Sync);
if (leftEntry.Other != long.MinValue)
{
TKey key = leftEntry.Key;
var hash = leftEntry.Hash;
if (rightEdgeMapForPartition.Find(hash, ref rightEdges))
{
while (rightEdges.Next(out int rightIndex))
{
if (this.keyComparer(key, rightEdgeMapForPartition.Values[rightIndex].Key))
{
OutputStartEdge(partition.nextLeftTime, ref key, ref leftEntry.Payload, ref rightEdgeMapForPartition.Values[rightIndex].Payload, hash);
}
}
}
if (!partition.isRightComplete)
{
int newIndex = leftEdgeMapForPartition.Insert(hash);
leftEdgeMapForPartition.Values[newIndex].Populate(ref key, ref leftEntry.Payload);
}
UpdateNextLeftTime(partition, leftEntry.Sync);
}
else
{
OutputPunctuation(leftEntry.Sync, ref leftEntry.Key, leftEntry.Hash);
return;
}
leftWorking.Dequeue();
}
else if (hasRightBatch)
{
UpdateNextRightTime(partition, rightEntry.Sync);
if (rightEntry.Other != long.MinValue)
{
TKey key = rightEntry.Key;
var hash = rightEntry.Hash;
if (leftEdgeMapForPartition.Find(hash, ref leftEdges))
{
while (leftEdges.Next(out int leftIndex))
{
if (this.keyComparer(key, leftEdgeMapForPartition.Values[leftIndex].Key))
{
OutputStartEdge(partition.nextRightTime, ref key, ref leftEdgeMapForPartition.Values[leftIndex].Payload, ref rightEntry.Payload, hash);
}
}
}
if (!partition.isLeftComplete)
{
int newIndex = rightEdgeMapForPartition.Insert(hash);
rightEdgeMapForPartition.Values[newIndex].Populate(ref key, ref rightEntry.Payload);
}
UpdateNextRightTime(partition, rightEntry.Sync);
}
else
{
OutputPunctuation(rightEntry.Sync, ref rightEntry.Key, rightEntry.Hash);
}
rightWorking.Dequeue();
}
else
{
if (partition.nextLeftTime < this.lastLeftCTI)
UpdateNextLeftTime(partition, this.lastLeftCTI);
if (partition.nextRightTime < this.lastRightCTI)
UpdateNextRightTime(partition, this.lastRightCTI);
this.cleanKeys.Add(pKey);
break;
}
}
}
if (this.emitCTI)
{
var earliest = Math.Min(this.lastLeftCTI, this.lastRightCTI);
AddLowWatermarkToBatch(earliest);
this.emitCTI = false;
foreach (var p in this.cleanKeys)
{
this.seenKeys.Remove(p);
this.leftQueue.Lookup(p, out var index);
this.leftQueue.entries[index].value.Dispose();
this.leftQueue.Remove(p);
this.rightQueue.entries[index].value.Dispose();
this.rightQueue.Remove(p);
}
this.cleanKeys.Clear();
}
this.processQueue.Clear();
}