private void Process()

in Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressSubscription.cs [4797:4980]


        private void Process(ref PartitionedStreamEvent<TKey, TResult> value)
        {
            Contract.Assume(value.SyncTime != value.OtherTime);

            if (value.IsLowWatermark)
            {
                GenerateAndProcessLowWatermark(value.SyncTime);
                return;
            }

            // Update global high water mark if necessary
            this.highWatermark = Math.Max(this.highWatermark, value.SyncTime);

            if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time && !this.lastPunctuationTime.ContainsKey(value.PartitionKey))
                UpdatePunctuation(value.PartitionKey, this.lowWatermark.rawValue, this.lowWatermark.quantizedForPunctuationGeneration);

            // Retrieve current time for this partition, updating currentTime if necessary
            if (!this.currentTime.TryGetValue(value.PartitionKey, out long current))
            {
                current = this.lowWatermark.rawValue;
            }
            else if (current < this.lowWatermark.rawValue)
            {
                current = this.lowWatermark.rawValue;
                UpdateCurrentTime(value.PartitionKey, this.lowWatermark.rawValue);
            }

            var outOfOrder = value.SyncTime < current;

            // check for out of order event
            if (value.IsPunctuation)
            {
                OnPunctuation(value.CreatePunctuation(outOfOrder ? current : value.SyncTime));
            }
            else
            {
                if (this.disorderPolicyType == DisorderPolicyType.Throw)
                {
                    if (outOfOrder)
                    {
                        throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
                    }
                }
                else
                {
                    // end events and interval events just get dropped
                    Tuple<long, TResult> key;
                    ElasticCircularBuffer<AdjustInfo> q;
                    switch (value.Kind)
                    {
                        case StreamEventKind.Start:
                            if (outOfOrder)
                            {
                                key = Tuple.Create(value.SyncTime, value.Payload);
                                if (!this.startEventInformation.TryGetValue(key, out q))
                                {
                                    q = new ElasticCircularBuffer<AdjustInfo>();
                                    this.startEventInformation.Add(key, q);
                                    var x = new AdjustInfo(current);
                                    q.Enqueue(ref x);
                                }
                                else
                                {
                                    var last = q.PeekLast();
                                    if (last.modifiedStartTime == current) last.numberOfOccurrences++;
                                    else
                                    {
                                        var x = new AdjustInfo(current);
                                        q.Enqueue(ref x);
                                    }
                                }
    
                                if (this.disorderPolicyType == DisorderPolicyType.Drop)
                                {
                                    this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
                                    return; // drop
                                }
                                else
                                {
                                    this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), new long?(current - value.SyncTime)));
                                    value = new PartitionedStreamEvent<TKey, TResult>(value.PartitionKey, current, StreamEvent.InfinitySyncTime, value.Payload);
                                }
                            }
                            break;
    
                        case StreamEventKind.Interval:
                            if (outOfOrder)
                            {
                                if (this.disorderPolicyType == DisorderPolicyType.Drop)
                                {
                                    this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
                                    return; // drop
                                }
                                else
                                {
                                    if (current >= value.OtherTime)
                                    {
                                        this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
                                        return; // drop
                                    }
    
                                    this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), new long?(current - value.SyncTime)));
                                    value = new PartitionedStreamEvent<TKey, TResult>(value.PartitionKey, current, value.OtherTime, value.Payload);
                                }
                            }
                            break;
    
                        case StreamEventKind.End:
                            // it may not be out of order, but did we drop/adjust the corresponding start event?
                            key = Tuple.Create(value.OtherTime, value.Payload);
                            if (this.startEventInformation.TryGetValue(key, out q))
                            {
                                Contract.Assume(!q.IsEmpty());
                                var firstElement = q.PeekFirst();
                                firstElement.numberOfOccurrences--;
                                if (firstElement.numberOfOccurrences == 0)
                                {
                                    q.Dequeue(); // throw away returned value
                                    if (q.Count == 0) this.startEventInformation.Remove(key);
                                }
                                var adjustedTime = firstElement.modifiedStartTime;
    
                                if (this.disorderPolicyType == DisorderPolicyType.Drop)
                                {
                                    this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
                                    return; // drop
                                }
                                else
                                {
                                    this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), new long?(current - value.SyncTime)));
                                    value = new PartitionedStreamEvent<TKey, TResult>(value.PartitionKey, outOfOrder ? current : value.SyncTime, adjustedTime, value.Payload);
                                }
                            }
                            else if (outOfOrder)
                            {
                                if (this.disorderPolicyType == DisorderPolicyType.Drop)
                                {
                                    this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
                                    return; // drop
                                }
                                else
                                {
                                    this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), new long?(current - value.SyncTime)));
                                    value = new PartitionedStreamEvent<TKey, TResult>(value.PartitionKey, current, value.OtherTime, value.Payload);
                                }
                            }
    
                            break;
                        default:
                            Contract.Assert(false, "switch meant to be exhaustive");
                            throw new InvalidOperationException("Unsupported stream event kind: " + value.Kind.ToString());
                    }
                }
    
                if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time)
                {
                    // Note that we must generate punctuation after disorder policy has been applied, since an event with an adjusted sync time may still need to generate the punctuation.
                    if (this.punctuationGenerationPeriod > 0)
                    {
                        // We use lowWatermark as the baseline in the delta computation because a low watermark implies
                        // punctuations for all partitions
                        var prevPunctuation = Math.Max(this.lastPunctuationTime[value.PartitionKey].lastPunctuationQuantized, this.lowWatermark.quantizedForPunctuationGeneration);
                        if ((ulong)(value.SyncTime - prevPunctuation) >= this.punctuationGenerationPeriod)
                        {
                            // SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
                            var punctuationTimeQuantized = value.SyncTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod);
    #if DEBUG
                            Debug.Assert(punctuationTimeQuantized >= LastEventTime(value.PartitionKey), "Bug in punctuation quantization logic");
    #endif
                            OnPunctuation(value.CreatePunctuation(punctuationTimeQuantized));
                        }
                    }
                }
    
                this.currentBatch.Add(value.SyncTime, value.OtherTime, new PartitionKey<TKey>(value.PartitionKey), value.Payload);
                if (this.currentBatch.Count == Config.DataBatchSize)
                {
                    if (this.flushPolicy == PartitionedFlushPolicy.FlushOnBatchBoundary) OnFlush();
                    else FlushContents();
                }
            }

            UpdateCurrentTime(value.PartitionKey, value.SyncTime);
        }