protected override void ProcessBothBatches()

in Sources/Core/Microsoft.StreamProcessing/Operators/EquiJoin/IncreasingOrder/IncreasingOrderEquiJoinPipe.cs [89:470]


        protected override void ProcessBothBatches(StreamMessage<TKey, TLeft> leftBatch, StreamMessage<TKey, TRight> rightBatch, out bool leftBatchDone, out bool rightBatchDone, out bool leftBatchFree, out bool rightBatchFree)
        {
            leftBatchFree = rightBatchFree = true;
            if (!GoToVisibleRow(leftBatch))
            {
                leftBatchDone = true;
                rightBatchDone = false;
                return;
            }

            UpdateNextLeftTime(leftBatch.vsync.col[leftBatch.iter]);
            this.nextLeftKey = leftBatch.key.col[leftBatch.iter];

            if (!GoToVisibleRow(rightBatch))
            {
                leftBatchDone = false;
                rightBatchDone = true;
                return;
            }

            UpdateNextRightTime(rightBatch.vsync.col[rightBatch.iter]);
            this.nextRightKey = rightBatch.key.col[rightBatch.iter];

            while (true)
            {
                bool leftPunctuation = leftBatch.vother.col[leftBatch.iter] == StreamEvent.PunctuationOtherTime;
                bool rightPunctuation = rightBatch.vother.col[rightBatch.iter] == StreamEvent.PunctuationOtherTime;

                int compare = (leftPunctuation || rightPunctuation) ? 0 : this.joinKeyOrderComparer(this.nextLeftKey, this.nextRightKey);

                if (compare == 0)
                {
                    if (this.nextLeftTime <= this.nextRightTime)
                    {
                        // process left
                        if (leftPunctuation)
                        {
                            AddPunctuationToBatch(this.nextLeftTime);
                        }
                        else
                        {
                            #region ProcessLeftStartEdge
                            /*
                            ProcessLeftStartEdge(
                                nextLeftTime,
                                ref leftBatch.key.col[leftBatch.iter],
                                leftBatch[leftBatch.iter],
                                leftBatch.hash.col[leftBatch.iter], compare);
                            */
                            {
                                var payload = leftBatch[leftBatch.iter];

                                if (this.currentRightList.Count > 0)
                                {
                                    int compare2 = this.joinKeyOrderComparer(this.nextLeftKey, this.currentRightKey);

                                    Contract.Assert(compare2 >= 0, "Unexpected comparison in sort-ordered join");

                                    if (compare2 == 0)
                                    {
                                        // perform the join
                                        for (int i = 0; i < this.currentRightList.Count; i++)
                                        {
                                            ActiveEvent<TRight> t = this.currentRightList[i];
                                            var nextLeftKeyTemp = this.nextLeftKey;
                                            OutputStartEdge(this.nextLeftTime > t.Timestamp ? this.nextLeftTime : t.Timestamp, ref nextLeftKeyTemp, ref payload, ref t.Payload, leftBatch.hash.col[leftBatch.iter]);
                                        }
                                    }
                                    else
                                    {
                                        // clear the right array
                                        this.currentRightList.Clear();
                                    }
                                }

                                if (compare >= 0)
                                {
                                    // update the left array
                                    if ((this.currentLeftList.Count != 0) && (this.joinKeyOrderComparer(this.nextLeftKey, this.currentLeftKey) != 0))
                                    {
                                        Contract.Assert(this.joinKeyOrderComparer(this.nextLeftKey, this.currentLeftKey) > 0);
                                        this.currentLeftList.Clear();
                                    }
                                    var temp = this.nextLeftKey;
                                    this.currentLeftKey = temp;
                                    var leftAE = new ActiveEvent<TLeft> { Payload = payload, Timestamp = this.nextLeftTime };
                                    this.currentLeftList.Add(leftAE);
                                }
                            }
                            #endregion
                        }

                        leftBatch.iter++;

                        if (!GoToVisibleRow(leftBatch))
                        {
                            leftBatchDone = true;
                            rightBatchDone = false;
                            return;
                        }

                        this.nextLeftTime = leftBatch.vsync.col[leftBatch.iter];
                        this.nextLeftKey = leftBatch.key.col[leftBatch.iter];
                    }
                    else
                    {
                        // process right
                        if (rightPunctuation)
                        {
                            AddPunctuationToBatch(this.nextRightTime);
                        }
                        else
                        {
                            #region ProcessRightStartEdge
                            /* Inlined version of:
                            ProcessRightStartEdge(
                                nextRightTime,
                                ref rightBatch.key.col[rightBatch.iter],
                                rightBatch[rightBatch.iter],
                                rightBatch.hash.col[rightBatch.iter], compare);
                            */
                            if (this.currentLeftList.Count > 0)
                            {
                                int compare2 = this.joinKeyOrderComparer(this.nextRightKey, this.currentLeftKey);

                                Contract.Assert(compare2 >= 0, "Unexpected comparison in sort-ordered join");

                                if (compare2 == 0)
                                {
                                    // perform the join
                                    var rightP = rightBatch[rightBatch.iter];
                                    for (int i = 0; i < this.currentLeftList.Count; i++)
                                    {
                                        ActiveEvent<TLeft> t = this.currentLeftList[i];

                                        #region OutputStartEdge
                                        /* OutputStartEdge(nextRightTime > t.Timestamp ? nextRightTime : t.Timestamp,
                                            ref nextRightKey, ref t.Payload, ref rightP, rightBatch.hash.col[rightBatch.iter]);*/
                                        int index = this.output.Count++;
                                        this.output.vsync.col[index] = this.nextRightTime > t.Timestamp ? this.nextRightTime : t.Timestamp;
                                        this.output.vother.col[index] = StreamEvent.InfinitySyncTime;
                                        this.output.key.col[index] = this.nextRightKey;
                                        this.output[index] = this.selector(t.Payload, rightP);
                                        this.output.hash.col[index] = rightBatch.hash.col[rightBatch.iter];

                                        if (this.output.Count == Config.DataBatchSize) FlushContents();
                                        #endregion
                                    }
                                }
                                else
                                {
                                    // clear the left array
                                    this.currentLeftList.Clear();
                                }
                            }

                            if (compare <= 0)
                            {
                                // update the right array
                                if ((this.currentRightList.Count != 0) && (this.joinKeyOrderComparer(rightBatch.key.col[rightBatch.iter], this.currentRightKey) != 0))
                                {
                                    Contract.Assert(this.joinKeyOrderComparer(rightBatch.key.col[rightBatch.iter], this.currentRightKey) > 0);
                                    this.currentRightList.Clear();
                                }

                                this.currentRightKey = rightBatch.key.col[rightBatch.iter];
                                ActiveEvent<TRight> rightAE = default;
                                var payload = rightBatch[rightBatch.iter];

                                rightAE.Populate(ref this.nextRightTime, ref payload);
                                this.currentRightList.Add(rightAE);
                            }
                            #endregion
                        }

                        rightBatch.iter++;

                        #region GoToVisibleRow
                        /* Inlined version of:
                        if (!GoToVisibleRow(rightBatch))
                        {
                            leftBatchDone = false;
                            rightBatchDone = true;
                            return;
                        }*/
                        while (rightBatch.iter < rightBatch.Count &&
                            (rightBatch.bitvector.col[rightBatch.iter >> 6] & (1L << (rightBatch.iter & 0x3f))) != 0 &&
                            rightBatch.vother.col[rightBatch.iter] >= 0)
                        {
                            rightBatch.iter++;
                        }
                        if (rightBatch.iter == rightBatch.Count)
                        {
                            leftBatchDone = false;
                            rightBatchDone = true;
                            return;
                        }
                        #endregion

                        this.nextRightTime = rightBatch.vsync.col[rightBatch.iter];
                        this.nextRightKey = rightBatch.key.col[rightBatch.iter];
                    }
                }
                else if (compare < 0)
                {
                    // process left
                    #region ProcessLeftStartEdge
                    /*
                        ProcessLeftStartEdge(
                            nextLeftTime,
                            ref leftBatch.key.col[leftBatch.iter],
                            leftBatch[leftBatch.iter],
                            leftBatch.hash.col[leftBatch.iter], compare);
                        */
                    {

                        if (this.currentRightList.Count > 0)
                        {
                            int compare2 = this.joinKeyOrderComparer(this.nextLeftKey, this.currentRightKey);

                            Contract.Assert(compare2 >= 0, "Unexpected comparison in sort-ordered join");

                            if (compare2 == 0)
                            {
                                // perform the join
                                var payload = leftBatch[leftBatch.iter];

                                for (int i = 0; i < this.currentRightList.Count; i++)
                                {
                                    ActiveEvent<TRight> t = this.currentRightList[i];
                                    var temp = this.nextLeftKey;
                                    OutputStartEdge(this.nextLeftTime > t.Timestamp ? this.nextLeftTime : t.Timestamp, ref temp, ref payload, ref t.Payload, leftBatch.hash.col[leftBatch.iter]);
                                }
                            }
                            else
                            {
                                // clear the right array
                                this.currentRightList.Clear();
                            }
                        }

                        /*if (compare >= 0)
                        {
                            // update the left array
                            if ((currentLeftList.Count != 0) && (joinKeyOrderComparer(nextLeftKey, currentLeftKey) != 0))
                            {
                                Contract.Assert(joinKeyOrderComparer(nextLeftKey, currentLeftKey) > 0);
                                currentLeftList.Clear();
                            }
                            currentLeftKey = nextLeftKey;
                            ActiveEvent<TLeft> leftAE = new ActiveEvent<TLeft> { Payload = payload, Timestamp = nextLeftTime };
                            currentLeftList.Add(leftAE);
                        }*/
                    }
                    #endregion

                    leftBatch.iter++;

                    #region GoToVisibleRow
                    /* Inlined version of:
                    if (!GoToVisibleRow(leftBatch))
                    {
                        leftBatchDone = true;
                        rightBatchDone = false;
                        return;
                    }*/
                    while (leftBatch.iter < leftBatch.Count &&
                        (leftBatch.bitvector.col[leftBatch.iter >> 6] & (1L << (leftBatch.iter & 0x3f))) != 0 &&
                        leftBatch.vother.col[leftBatch.iter] >= 0)
                    {
                        leftBatch.iter++;
                    }
                    if (leftBatch.iter == leftBatch.Count)
                    {
                        leftBatchDone = true;
                        rightBatchDone = false;
                        return;
                    }
                    #endregion

                    #region UpdateNextLeftTime

                    this.nextLeftTime = leftBatch.vsync.col[leftBatch.iter];
                        #endregion

                    this.nextLeftKey = leftBatch.key.col[leftBatch.iter];

                }
                else // hot path if larger right side of join matches very few things on the left side
                {
                    // process right
                    #region ProcessRightStartEdge
                    /* Inlined version of:
                    ProcessRightStartEdge(
                        nextRightTime,
                        ref rightBatch.key.col[rightBatch.iter],
                        rightBatch.payload.col[rightBatch.iter],
                        rightBatch.hash.col[rightBatch.iter], compare);
                    */
                    if (this.currentLeftList.Count > 0)
                    {
                        int compare2 = this.joinKeyOrderComparer(this.nextRightKey, this.currentLeftKey);

                        Contract.Assert(compare2 >= 0, "Unexpected comparison in sort-ordered join");

                        if (compare2 == 0)
                        {
                            // perform the join
                            var rightP = rightBatch[rightBatch.iter];
                            for (int i = 0; i < this.currentLeftList.Count; i++)
                            {
                                ActiveEvent<TLeft> t = this.currentLeftList[i];
                                #region OutputStartEdge
                                /* OutputStartEdge(nextRightTime > t.Timestamp ? nextRightTime : t.Timestamp,
                                        ref nextRightKey, ref t.Payload, ref rightP, rightBatch.hash.col[rightBatch.iter]);*/
                                int index = this.output.Count++;
                                this.output.vsync.col[index] = this.nextRightTime > t.Timestamp ? this.nextRightTime : t.Timestamp;
                                this.output.vother.col[index] = StreamEvent.InfinitySyncTime;
                                this.output.key.col[index] = this.nextRightKey;
                                this.output[index] = this.selector(t.Payload, rightP);
                                this.output.hash.col[index] = rightBatch.hash.col[rightBatch.iter];

                                if (this.output.Count == Config.DataBatchSize) FlushContents();
                                #endregion
                            }
                        }
                        else
                        {
                            // clear the left array
                            this.currentLeftList.Clear();
                        }
                    }

                    /*
                    if (compare <= 0)
                    {
                        // update the right array
                        if ((currentRightList.Count != 0) && (joinKeyOrderComparer(rightBatch.key.col[rightBatch.iter], currentRightKey) != 0))
                        {
                            Contract.Assert(joinKeyOrderComparer(rightBatch.key.col[rightBatch.iter], currentRightKey) > 0);
                            currentRightList.Clear();
                        }
                        currentRightKey = rightBatch.key.col[rightBatch.iter];
                        ActiveEvent<TRight> rightAE = new ActiveEvent<TRight>();
                        rightAE.Populate(ref nextRightTime, ref rightBatch.payload.col[rightBatch.iter]);
                        currentRightList.Add(rightAE);
                    }*/
                    #endregion

                    rightBatch.iter++;

                    #region GoToVisibleRow
                    /* Inlined version of:
                    if (!GoToVisibleRow(rightBatch))
                    {
                        leftBatchDone = false;
                        rightBatchDone = true;
                        return;
                    }*/
                    while (rightBatch.iter < rightBatch.Count &&
                        (rightBatch.bitvector.col[rightBatch.iter >> 6] & (1L << (rightBatch.iter & 0x3f))) != 0 &&
                        rightBatch.vother.col[rightBatch.iter] >= 0)
                    {
                        rightBatch.iter++;
                    }
                    if (rightBatch.iter == rightBatch.Count)
                    {
                        leftBatchDone = false;
                        rightBatchDone = true;
                        return;
                    }
                    #endregion

                    #region UpdateNextRightTime
                        /* Inlined version of: UpdateNextRightTime(rightBatch.vsync.col[rightBatch.iter]); */
                    this.nextRightTime = rightBatch.vsync.col[rightBatch.iter];
                        #endregion

                    this.nextRightKey = rightBatch.key.col[rightBatch.iter];
                }
            }
        }