in Sources/Core/Microsoft.StreamProcessing/Operators/EquiJoin/Basic/PartitionedEquiJoinPipe.cs [247:445]
private void ProcessPendingEntries()
{
foreach (var pKey in this.processQueue)
{
// Partition is no longer clean if we are processing it. If it is still clean, it will be added below.
this.cleanKeys.Remove(pKey);
Queue<LEntry> leftWorking = null;
Queue<REntry> rightWorking = null;
this.leftQueue.Lookup(pKey, out int index);
leftWorking = this.leftQueue.entries[index].value;
rightWorking = this.rightQueue.entries[index].value;
this.partitionData.Lookup(pKey, out index);
var partition = this.partitionData.entries[index].value;
while (true)
{
LEntry leftEntry;
REntry rightEntry;
var old = partition.currTime;
bool hasLeftBatch = leftWorking.Count != 0;
bool hasRightBatch = rightWorking.Count != 0;
if (hasLeftBatch && hasRightBatch)
{
leftEntry = leftWorking.Peek();
rightEntry = rightWorking.Peek();
UpdateNextLeftTime(partition, leftEntry.Sync);
UpdateNextRightTime(partition, rightEntry.Sync);
if (partition.nextLeftTime < partition.nextRightTime)
{
UpdateTime(partition, partition.nextLeftTime);
if (leftEntry.Other != long.MinValue)
{
ProcessLeftEvent(
partition,
partition.nextLeftTime,
leftEntry.Other,
ref leftEntry.Key,
leftEntry.Payload,
leftEntry.Hash);
}
else if (partition.currTime > old)
{
var r = default(TRight);
AddToBatch(
partition.currTime,
long.MinValue,
ref leftEntry.Key,
ref leftEntry.Payload,
ref r,
leftEntry.Hash);
}
leftWorking.Dequeue();
}
else
{
UpdateTime(partition, partition.nextRightTime);
if (rightEntry.Other != long.MinValue)
{
ProcessRightEvent(
partition,
partition.nextRightTime,
rightEntry.Other,
ref rightEntry.Key,
rightEntry.Payload,
rightEntry.Hash);
}
else if (partition.currTime > old)
{
var l = default(TLeft);
AddToBatch(
partition.currTime,
long.MinValue,
ref rightEntry.Key,
ref l,
ref rightEntry.Payload,
rightEntry.Hash);
}
rightWorking.Dequeue();
}
}
else if (hasLeftBatch)
{
leftEntry = leftWorking.Peek();
UpdateNextLeftTime(partition, leftEntry.Sync);
partition.nextRightTime = Math.Max(partition.nextRightTime, this.lastRightCTI);
if (partition.nextLeftTime > partition.nextRightTime)
{
// If we have not yet reached the lesser of the two sides (in this case, right), and we don't
// have input from that side, reach that time now. This can happen with low watermarks.
if (partition.currTime < partition.nextRightTime)
UpdateTime(partition, partition.nextRightTime);
break;
}
UpdateTime(partition, partition.nextLeftTime);
if (leftEntry.Other != long.MinValue)
{
ProcessLeftEvent(
partition,
partition.nextLeftTime,
leftEntry.Other,
ref leftEntry.Key,
leftEntry.Payload,
leftEntry.Hash);
}
else if (partition.currTime > old)
{
var r = default(TRight);
AddToBatch(
partition.currTime,
long.MinValue,
ref leftEntry.Key,
ref leftEntry.Payload,
ref r,
leftEntry.Hash);
}
leftWorking.Dequeue();
}
else if (hasRightBatch)
{
rightEntry = rightWorking.Peek();
UpdateNextRightTime(partition, rightEntry.Sync);
partition.nextLeftTime = Math.Max(partition.nextLeftTime, this.lastLeftCTI);
if (partition.nextLeftTime < partition.nextRightTime)
{
// If we have not yet reached the lesser of the two sides (in this case, left), and we don't
// have input from that side, reach that time now. This can happen with low watermarks.
if (partition.currTime < partition.nextLeftTime)
UpdateTime(partition, partition.nextLeftTime);
break;
}
UpdateTime(partition, partition.nextRightTime);
if (rightEntry.Other != long.MinValue)
{
ProcessRightEvent(
partition,
partition.nextRightTime,
rightEntry.Other,
ref rightEntry.Key,
rightEntry.Payload,
rightEntry.Hash);
}
else if (partition.currTime > old)
{
var l = default(TLeft);
AddToBatch(
partition.currTime,
long.MinValue,
ref rightEntry.Key,
ref l,
ref rightEntry.Payload,
rightEntry.Hash);
}
rightWorking.Dequeue();
}
else
{
if (partition.nextLeftTime < this.lastLeftCTI)
UpdateNextLeftTime(partition, this.lastLeftCTI);
if (partition.nextRightTime < this.lastRightCTI)
UpdateNextRightTime(partition, this.lastRightCTI);
UpdateTime(partition, Math.Min(this.lastLeftCTI, this.lastRightCTI));
if (partition.IsClean()) this.cleanKeys.Add(pKey);
break;
}
}
}
if (this.emitCTI)
{
var earliest = Math.Min(this.lastLeftCTI, this.lastRightCTI);
AddLowWatermarkToBatch(earliest);
this.emitCTI = false;
foreach (var p in this.cleanKeys)
{
this.seenKeys.Remove(p);
this.leftQueue.Remove(p);
this.rightQueue.Remove(p);
}
this.cleanKeys.Clear();
}
this.processQueue.Clear();
}