private void ProcessPendingEntries()

in Sources/Core/Microsoft.StreamProcessing/Operators/EquiJoin/StartEdge/PartitionedStartEdgeEquiJoinPipe.cs [220:417]


        private void ProcessPendingEntries()
        {
            foreach (var pKey in this.processQueue)
            {
                // Partition is no longer clean if we are processing it. If it is still clean, it will be added below.
                this.cleanKeys.Remove(pKey);

                PooledElasticCircularBuffer<LEntry> leftWorking = null;
                PooledElasticCircularBuffer<REntry> rightWorking = null;
                this.leftQueue.Lookup(pKey, out int index);
                leftWorking = this.leftQueue.entries[index].value;
                rightWorking = this.rightQueue.entries[index].value;

                FastMap<ActiveEvent<TLeft>> leftEdgeMapForPartition = null;
                FastMap<ActiveEvent<TRight>> rightEdgeMapForPartition = null;
                this.partitionData.Lookup(pKey, out index);
                var partition = this.partitionData.entries[index].value;
                leftEdgeMapForPartition = partition.leftEdgeMap;
                rightEdgeMapForPartition = partition.rightEdgeMap;

                while (true)
                {
                    bool hasLeftBatch = leftWorking.TryPeekFirst(out LEntry leftEntry);
                    bool hasRightBatch = rightWorking.TryPeekFirst(out REntry rightEntry);
                    FastMap<ActiveEvent<TRight>>.FindTraverser rightEdges = default;
                    FastMap<ActiveEvent<TLeft>>.FindTraverser leftEdges = default;
                    if (hasLeftBatch && hasRightBatch)
                    {
                        UpdateNextLeftTime(partition, leftEntry.Sync);
                        UpdateNextRightTime(partition, rightEntry.Sync);

                        if (partition.nextLeftTime <= partition.nextRightTime)
                        {
                            if (leftEntry.Other != long.MinValue)
                            {
                                TKey key = leftEntry.Key;

                                var hash = leftEntry.Hash;
                                if (rightEdgeMapForPartition.Find(hash, ref rightEdges))
                                {
                                    while (rightEdges.Next(out int rightIndex))
                                    {
                                        if (this.keyComparer(key, rightEdgeMapForPartition.Values[rightIndex].Key))
                                        {
                                            OutputStartEdge(partition.nextLeftTime, ref key, ref leftEntry.Payload, ref rightEdgeMapForPartition.Values[rightIndex].Payload, hash);
                                        }
                                    }
                                }
                                if (!partition.isRightComplete)
                                {
                                    int newIndex = leftEdgeMapForPartition.Insert(hash);
                                    leftEdgeMapForPartition.Values[newIndex].Populate(ref key, ref leftEntry.Payload);
                                }

                                UpdateNextLeftTime(partition, leftEntry.Sync);
                            }
                            else
                            {
                                OutputPunctuation(leftEntry.Sync, ref leftEntry.Key, leftEntry.Hash);
                            }

                            leftWorking.Dequeue();
                        }
                        else
                        {
                            if (rightEntry.Other != long.MinValue)
                            {
                                TKey key = rightEntry.Key;

                                var hash = rightEntry.Hash;
                                if (leftEdgeMapForPartition.Find(hash, ref leftEdges))
                                {
                                    while (leftEdges.Next(out int leftIndex))
                                    {
                                        if (this.keyComparer(key, leftEdgeMapForPartition.Values[leftIndex].Key))
                                        {
                                            OutputStartEdge(partition.nextRightTime, ref key, ref leftEdgeMapForPartition.Values[leftIndex].Payload, ref rightEntry.Payload, hash);
                                        }
                                    }
                                }
                                if (!partition.isLeftComplete)
                                {
                                    int newIndex = rightEdgeMapForPartition.Insert(hash);
                                    rightEdgeMapForPartition.Values[newIndex].Populate(ref key, ref rightEntry.Payload);
                                }

                                UpdateNextRightTime(partition, rightEntry.Sync);
                            }
                            else
                            {
                                OutputPunctuation(rightEntry.Sync, ref rightEntry.Key, rightEntry.Hash);
                            }

                            rightWorking.Dequeue();
                        }
                    }
                    else if (hasLeftBatch)
                    {
                        UpdateNextLeftTime(partition, leftEntry.Sync);

                        if (leftEntry.Other != long.MinValue)
                        {
                            TKey key = leftEntry.Key;

                            var hash = leftEntry.Hash;
                            if (rightEdgeMapForPartition.Find(hash, ref rightEdges))
                            {
                                while (rightEdges.Next(out int rightIndex))
                                {
                                    if (this.keyComparer(key, rightEdgeMapForPartition.Values[rightIndex].Key))
                                    {
                                        OutputStartEdge(partition.nextLeftTime, ref key, ref leftEntry.Payload, ref rightEdgeMapForPartition.Values[rightIndex].Payload, hash);
                                    }
                                }
                            }
                            if (!partition.isRightComplete)
                            {
                                int newIndex = leftEdgeMapForPartition.Insert(hash);
                                leftEdgeMapForPartition.Values[newIndex].Populate(ref key, ref leftEntry.Payload);
                            }

                            UpdateNextLeftTime(partition, leftEntry.Sync);
                        }
                        else
                        {
                            OutputPunctuation(leftEntry.Sync, ref leftEntry.Key, leftEntry.Hash);
                            return;
                        }

                        leftWorking.Dequeue();
                    }
                    else if (hasRightBatch)
                    {
                        UpdateNextRightTime(partition, rightEntry.Sync);

                        if (rightEntry.Other != long.MinValue)
                        {
                            TKey key = rightEntry.Key;

                            var hash = rightEntry.Hash;
                            if (leftEdgeMapForPartition.Find(hash, ref leftEdges))
                            {
                                while (leftEdges.Next(out int leftIndex))
                                {
                                    if (this.keyComparer(key, leftEdgeMapForPartition.Values[leftIndex].Key))
                                    {
                                        OutputStartEdge(partition.nextRightTime, ref key, ref leftEdgeMapForPartition.Values[leftIndex].Payload, ref rightEntry.Payload, hash);
                                    }
                                }
                            }
                            if (!partition.isLeftComplete)
                            {
                                int newIndex = rightEdgeMapForPartition.Insert(hash);
                                rightEdgeMapForPartition.Values[newIndex].Populate(ref key, ref rightEntry.Payload);
                            }

                            UpdateNextRightTime(partition, rightEntry.Sync);
                        }
                        else
                        {
                            OutputPunctuation(rightEntry.Sync, ref rightEntry.Key, rightEntry.Hash);
                        }

                        rightWorking.Dequeue();
                    }
                    else
                    {
                        if (partition.nextLeftTime < this.lastLeftCTI)
                            UpdateNextLeftTime(partition, this.lastLeftCTI);
                        if (partition.nextRightTime < this.lastRightCTI)
                            UpdateNextRightTime(partition, this.lastRightCTI);
                        this.cleanKeys.Add(pKey);
                        break;
                    }
                }
            }

            if (this.emitCTI)
            {
                var earliest = Math.Min(this.lastLeftCTI, this.lastRightCTI);
                AddLowWatermarkToBatch(earliest);
                this.emitCTI = false;
                foreach (var p in this.cleanKeys)
                {
                    this.seenKeys.Remove(p);

                    this.leftQueue.Lookup(p, out var index);
                    this.leftQueue.entries[index].value.Dispose();
                    this.leftQueue.Remove(p);
                    this.rightQueue.entries[index].value.Dispose();
                    this.rightQueue.Remove(p);
                }

                this.cleanKeys.Clear();
            }

            this.processQueue.Clear();
        }