in Sources/Core/Microsoft.StreamProcessing/Operators/EquiJoin/IncreasingOrder/IncreasingOrderEquiJoinPipe.cs [89:470]
protected override void ProcessBothBatches(StreamMessage<TKey, TLeft> leftBatch, StreamMessage<TKey, TRight> rightBatch, out bool leftBatchDone, out bool rightBatchDone, out bool leftBatchFree, out bool rightBatchFree)
{
leftBatchFree = rightBatchFree = true;
if (!GoToVisibleRow(leftBatch))
{
leftBatchDone = true;
rightBatchDone = false;
return;
}
UpdateNextLeftTime(leftBatch.vsync.col[leftBatch.iter]);
this.nextLeftKey = leftBatch.key.col[leftBatch.iter];
if (!GoToVisibleRow(rightBatch))
{
leftBatchDone = false;
rightBatchDone = true;
return;
}
UpdateNextRightTime(rightBatch.vsync.col[rightBatch.iter]);
this.nextRightKey = rightBatch.key.col[rightBatch.iter];
while (true)
{
bool leftPunctuation = leftBatch.vother.col[leftBatch.iter] == StreamEvent.PunctuationOtherTime;
bool rightPunctuation = rightBatch.vother.col[rightBatch.iter] == StreamEvent.PunctuationOtherTime;
int compare = (leftPunctuation || rightPunctuation) ? 0 : this.joinKeyOrderComparer(this.nextLeftKey, this.nextRightKey);
if (compare == 0)
{
if (this.nextLeftTime <= this.nextRightTime)
{
// process left
if (leftPunctuation)
{
AddPunctuationToBatch(this.nextLeftTime);
}
else
{
#region ProcessLeftStartEdge
/*
ProcessLeftStartEdge(
nextLeftTime,
ref leftBatch.key.col[leftBatch.iter],
leftBatch[leftBatch.iter],
leftBatch.hash.col[leftBatch.iter], compare);
*/
{
var payload = leftBatch[leftBatch.iter];
if (this.currentRightList.Count > 0)
{
int compare2 = this.joinKeyOrderComparer(this.nextLeftKey, this.currentRightKey);
Contract.Assert(compare2 >= 0, "Unexpected comparison in sort-ordered join");
if (compare2 == 0)
{
// perform the join
for (int i = 0; i < this.currentRightList.Count; i++)
{
ActiveEvent<TRight> t = this.currentRightList[i];
var nextLeftKeyTemp = this.nextLeftKey;
OutputStartEdge(this.nextLeftTime > t.Timestamp ? this.nextLeftTime : t.Timestamp, ref nextLeftKeyTemp, ref payload, ref t.Payload, leftBatch.hash.col[leftBatch.iter]);
}
}
else
{
// clear the right array
this.currentRightList.Clear();
}
}
if (compare >= 0)
{
// update the left array
if ((this.currentLeftList.Count != 0) && (this.joinKeyOrderComparer(this.nextLeftKey, this.currentLeftKey) != 0))
{
Contract.Assert(this.joinKeyOrderComparer(this.nextLeftKey, this.currentLeftKey) > 0);
this.currentLeftList.Clear();
}
var temp = this.nextLeftKey;
this.currentLeftKey = temp;
var leftAE = new ActiveEvent<TLeft> { Payload = payload, Timestamp = this.nextLeftTime };
this.currentLeftList.Add(leftAE);
}
}
#endregion
}
leftBatch.iter++;
if (!GoToVisibleRow(leftBatch))
{
leftBatchDone = true;
rightBatchDone = false;
return;
}
this.nextLeftTime = leftBatch.vsync.col[leftBatch.iter];
this.nextLeftKey = leftBatch.key.col[leftBatch.iter];
}
else
{
// process right
if (rightPunctuation)
{
AddPunctuationToBatch(this.nextRightTime);
}
else
{
#region ProcessRightStartEdge
/* Inlined version of:
ProcessRightStartEdge(
nextRightTime,
ref rightBatch.key.col[rightBatch.iter],
rightBatch[rightBatch.iter],
rightBatch.hash.col[rightBatch.iter], compare);
*/
if (this.currentLeftList.Count > 0)
{
int compare2 = this.joinKeyOrderComparer(this.nextRightKey, this.currentLeftKey);
Contract.Assert(compare2 >= 0, "Unexpected comparison in sort-ordered join");
if (compare2 == 0)
{
// perform the join
var rightP = rightBatch[rightBatch.iter];
for (int i = 0; i < this.currentLeftList.Count; i++)
{
ActiveEvent<TLeft> t = this.currentLeftList[i];
#region OutputStartEdge
/* OutputStartEdge(nextRightTime > t.Timestamp ? nextRightTime : t.Timestamp,
ref nextRightKey, ref t.Payload, ref rightP, rightBatch.hash.col[rightBatch.iter]);*/
int index = this.output.Count++;
this.output.vsync.col[index] = this.nextRightTime > t.Timestamp ? this.nextRightTime : t.Timestamp;
this.output.vother.col[index] = StreamEvent.InfinitySyncTime;
this.output.key.col[index] = this.nextRightKey;
this.output[index] = this.selector(t.Payload, rightP);
this.output.hash.col[index] = rightBatch.hash.col[rightBatch.iter];
if (this.output.Count == Config.DataBatchSize) FlushContents();
#endregion
}
}
else
{
// clear the left array
this.currentLeftList.Clear();
}
}
if (compare <= 0)
{
// update the right array
if ((this.currentRightList.Count != 0) && (this.joinKeyOrderComparer(rightBatch.key.col[rightBatch.iter], this.currentRightKey) != 0))
{
Contract.Assert(this.joinKeyOrderComparer(rightBatch.key.col[rightBatch.iter], this.currentRightKey) > 0);
this.currentRightList.Clear();
}
this.currentRightKey = rightBatch.key.col[rightBatch.iter];
ActiveEvent<TRight> rightAE = default;
var payload = rightBatch[rightBatch.iter];
rightAE.Populate(ref this.nextRightTime, ref payload);
this.currentRightList.Add(rightAE);
}
#endregion
}
rightBatch.iter++;
#region GoToVisibleRow
/* Inlined version of:
if (!GoToVisibleRow(rightBatch))
{
leftBatchDone = false;
rightBatchDone = true;
return;
}*/
while (rightBatch.iter < rightBatch.Count &&
(rightBatch.bitvector.col[rightBatch.iter >> 6] & (1L << (rightBatch.iter & 0x3f))) != 0 &&
rightBatch.vother.col[rightBatch.iter] >= 0)
{
rightBatch.iter++;
}
if (rightBatch.iter == rightBatch.Count)
{
leftBatchDone = false;
rightBatchDone = true;
return;
}
#endregion
this.nextRightTime = rightBatch.vsync.col[rightBatch.iter];
this.nextRightKey = rightBatch.key.col[rightBatch.iter];
}
}
else if (compare < 0)
{
// process left
#region ProcessLeftStartEdge
/*
ProcessLeftStartEdge(
nextLeftTime,
ref leftBatch.key.col[leftBatch.iter],
leftBatch[leftBatch.iter],
leftBatch.hash.col[leftBatch.iter], compare);
*/
{
if (this.currentRightList.Count > 0)
{
int compare2 = this.joinKeyOrderComparer(this.nextLeftKey, this.currentRightKey);
Contract.Assert(compare2 >= 0, "Unexpected comparison in sort-ordered join");
if (compare2 == 0)
{
// perform the join
var payload = leftBatch[leftBatch.iter];
for (int i = 0; i < this.currentRightList.Count; i++)
{
ActiveEvent<TRight> t = this.currentRightList[i];
var temp = this.nextLeftKey;
OutputStartEdge(this.nextLeftTime > t.Timestamp ? this.nextLeftTime : t.Timestamp, ref temp, ref payload, ref t.Payload, leftBatch.hash.col[leftBatch.iter]);
}
}
else
{
// clear the right array
this.currentRightList.Clear();
}
}
/*if (compare >= 0)
{
// update the left array
if ((currentLeftList.Count != 0) && (joinKeyOrderComparer(nextLeftKey, currentLeftKey) != 0))
{
Contract.Assert(joinKeyOrderComparer(nextLeftKey, currentLeftKey) > 0);
currentLeftList.Clear();
}
currentLeftKey = nextLeftKey;
ActiveEvent<TLeft> leftAE = new ActiveEvent<TLeft> { Payload = payload, Timestamp = nextLeftTime };
currentLeftList.Add(leftAE);
}*/
}
#endregion
leftBatch.iter++;
#region GoToVisibleRow
/* Inlined version of:
if (!GoToVisibleRow(leftBatch))
{
leftBatchDone = true;
rightBatchDone = false;
return;
}*/
while (leftBatch.iter < leftBatch.Count &&
(leftBatch.bitvector.col[leftBatch.iter >> 6] & (1L << (leftBatch.iter & 0x3f))) != 0 &&
leftBatch.vother.col[leftBatch.iter] >= 0)
{
leftBatch.iter++;
}
if (leftBatch.iter == leftBatch.Count)
{
leftBatchDone = true;
rightBatchDone = false;
return;
}
#endregion
#region UpdateNextLeftTime
this.nextLeftTime = leftBatch.vsync.col[leftBatch.iter];
#endregion
this.nextLeftKey = leftBatch.key.col[leftBatch.iter];
}
else // hot path if larger right side of join matches very few things on the left side
{
// process right
#region ProcessRightStartEdge
/* Inlined version of:
ProcessRightStartEdge(
nextRightTime,
ref rightBatch.key.col[rightBatch.iter],
rightBatch.payload.col[rightBatch.iter],
rightBatch.hash.col[rightBatch.iter], compare);
*/
if (this.currentLeftList.Count > 0)
{
int compare2 = this.joinKeyOrderComparer(this.nextRightKey, this.currentLeftKey);
Contract.Assert(compare2 >= 0, "Unexpected comparison in sort-ordered join");
if (compare2 == 0)
{
// perform the join
var rightP = rightBatch[rightBatch.iter];
for (int i = 0; i < this.currentLeftList.Count; i++)
{
ActiveEvent<TLeft> t = this.currentLeftList[i];
#region OutputStartEdge
/* OutputStartEdge(nextRightTime > t.Timestamp ? nextRightTime : t.Timestamp,
ref nextRightKey, ref t.Payload, ref rightP, rightBatch.hash.col[rightBatch.iter]);*/
int index = this.output.Count++;
this.output.vsync.col[index] = this.nextRightTime > t.Timestamp ? this.nextRightTime : t.Timestamp;
this.output.vother.col[index] = StreamEvent.InfinitySyncTime;
this.output.key.col[index] = this.nextRightKey;
this.output[index] = this.selector(t.Payload, rightP);
this.output.hash.col[index] = rightBatch.hash.col[rightBatch.iter];
if (this.output.Count == Config.DataBatchSize) FlushContents();
#endregion
}
}
else
{
// clear the left array
this.currentLeftList.Clear();
}
}
/*
if (compare <= 0)
{
// update the right array
if ((currentRightList.Count != 0) && (joinKeyOrderComparer(rightBatch.key.col[rightBatch.iter], currentRightKey) != 0))
{
Contract.Assert(joinKeyOrderComparer(rightBatch.key.col[rightBatch.iter], currentRightKey) > 0);
currentRightList.Clear();
}
currentRightKey = rightBatch.key.col[rightBatch.iter];
ActiveEvent<TRight> rightAE = new ActiveEvent<TRight>();
rightAE.Populate(ref nextRightTime, ref rightBatch.payload.col[rightBatch.iter]);
currentRightList.Add(rightAE);
}*/
#endregion
rightBatch.iter++;
#region GoToVisibleRow
/* Inlined version of:
if (!GoToVisibleRow(rightBatch))
{
leftBatchDone = false;
rightBatchDone = true;
return;
}*/
while (rightBatch.iter < rightBatch.Count &&
(rightBatch.bitvector.col[rightBatch.iter >> 6] & (1L << (rightBatch.iter & 0x3f))) != 0 &&
rightBatch.vother.col[rightBatch.iter] >= 0)
{
rightBatch.iter++;
}
if (rightBatch.iter == rightBatch.Count)
{
leftBatchDone = false;
rightBatchDone = true;
return;
}
#endregion
#region UpdateNextRightTime
/* Inlined version of: UpdateNextRightTime(rightBatch.vsync.col[rightBatch.iter]); */
this.nextRightTime = rightBatch.vsync.col[rightBatch.iter];
#endregion
this.nextRightKey = rightBatch.key.col[rightBatch.iter];
}
}
}