in Sources/Core/Microsoft.StreamProcessing/Ingress/Temporal/TemporalIngressSubscription.cs [5789:5987]
private void Action(long start, long end, TResult payload, PartitionKey<TKey> actionKey)
{
var value = new PartitionedStreamEvent<TKey, TResult>(actionKey.Key, start, end, payload);
if (value.IsLowWatermark)
{
GenerateAndProcessLowWatermark(value.SyncTime);
return;
}
// Check to see if we need to generate a low watermark due to PeriodicLowWatermarkPolicy
if (this.lowWatermarkPolicyType == PeriodicLowWatermarkPolicyType.Time &&
value.SyncTime > this.lowWatermarkTimestampLag)
{
var newLowWatermark = value.SyncTime - this.lowWatermarkTimestampLag;
if ((ulong)(newLowWatermark - this.lowWatermark.quantizedForLowWatermarkGeneration) >= this.lowWatermarkGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new watermark, but first snap it to the nearest generationPeriod boundary
var newLowWatermarkSnapped = newLowWatermark.SnapToLeftBoundary((long)this.lowWatermarkGenerationPeriod);
GenerateAndProcessLowWatermark(newLowWatermarkSnapped);
}
}
Contract.Assume(value.SyncTime != value.OtherTime);
// Update global high water mark if necessary
this.highWatermark = Math.Max(this.highWatermark, value.SyncTime);
if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time && !this.lastPunctuationTime.ContainsKey(value.PartitionKey))
UpdatePunctuation(value.PartitionKey, this.lowWatermark.rawValue, this.lowWatermark.quantizedForPunctuationGeneration);
// Retrieve current time for this partition, updating currentTime if necessary
if (!this.currentTime.TryGetValue(value.PartitionKey, out long current))
{
current = this.lowWatermark.rawValue;
}
else if (current < this.lowWatermark.rawValue)
{
current = this.lowWatermark.rawValue;
UpdateCurrentTime(value.PartitionKey, this.lowWatermark.rawValue);
}
var outOfOrder = value.SyncTime < current;
// check for out of order event
if (value.IsPunctuation)
{
OnPunctuation(value.CreatePunctuation(outOfOrder ? current : value.SyncTime));
}
else
{
if (this.disorderPolicyType == DisorderPolicyType.Throw)
{
if (outOfOrder)
{
throw new IngressException($"Out-of-order event encountered during ingress, under a disorder policy of Throw: value.SyncTime: {value.SyncTime}, current:{current}");
}
}
else
{
// end events and interval events just get dropped
Tuple<long, TResult> key;
ElasticCircularBuffer<AdjustInfo> q;
switch (value.Kind)
{
case StreamEventKind.Start:
if (outOfOrder)
{
key = Tuple.Create(value.SyncTime, value.Payload);
if (!this.startEventInformation.TryGetValue(key, out q))
{
q = new ElasticCircularBuffer<AdjustInfo>();
this.startEventInformation.Add(key, q);
var x = new AdjustInfo(current);
q.Enqueue(ref x);
}
else
{
var last = q.PeekLast();
if (last.modifiedStartTime == current) last.numberOfOccurrences++;
else
{
var x = new AdjustInfo(current);
q.Enqueue(ref x);
}
}
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
return; // drop
}
else
{
this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), new long?(current - value.SyncTime)));
value = new PartitionedStreamEvent<TKey, TResult>(value.PartitionKey, current, StreamEvent.InfinitySyncTime, value.Payload);
}
}
break;
case StreamEventKind.Interval:
if (outOfOrder)
{
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
return; // drop
}
else
{
if (current >= value.OtherTime)
{
this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
return; // drop
}
this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), new long?(current - value.SyncTime)));
value = new PartitionedStreamEvent<TKey, TResult>(value.PartitionKey, current, value.OtherTime, value.Payload);
}
}
break;
case StreamEventKind.End:
// it may not be out of order, but did we drop/adjust the corresponding start event?
key = Tuple.Create(value.OtherTime, value.Payload);
if (this.startEventInformation.TryGetValue(key, out q))
{
Contract.Assume(!q.IsEmpty());
var firstElement = q.PeekFirst();
firstElement.numberOfOccurrences--;
if (firstElement.numberOfOccurrences == 0)
{
q.Dequeue(); // throw away returned value
if (q.Count == 0) this.startEventInformation.Remove(key);
}
var adjustedTime = firstElement.modifiedStartTime;
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
return; // drop
}
else
{
this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), new long?(current - value.SyncTime)));
value = new PartitionedStreamEvent<TKey, TResult>(value.PartitionKey, outOfOrder ? current : value.SyncTime, adjustedTime, value.Payload);
}
}
else if (outOfOrder)
{
if (this.disorderPolicyType == DisorderPolicyType.Drop)
{
this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), default));
return; // drop
}
else
{
this.diagnosticOutput?.OnNext(OutOfOrderPartitionedStreamEvent.Create(new PartitionedStreamEvent<TKey, TPayload>(value.PartitionKey, value.SyncTime, value.OtherTime, default), new long?(current - value.SyncTime)));
value = new PartitionedStreamEvent<TKey, TResult>(value.PartitionKey, current, value.OtherTime, value.Payload);
}
}
break;
default:
Contract.Assert(false, "switch meant to be exhaustive");
throw new InvalidOperationException("Unsupported stream event kind: " + value.Kind.ToString());
}
}
if (this.punctuationPolicyType == PeriodicPunctuationPolicyType.Time)
{
// Note that we must generate punctuation after disorder policy has been applied, since an event with an adjusted sync time may still need to generate the punctuation.
if (this.punctuationGenerationPeriod > 0)
{
// We use lowWatermark as the baseline in the delta computation because a low watermark implies
// punctuations for all partitions
var prevPunctuation = Math.Max(this.lastPunctuationTime[value.PartitionKey].lastPunctuationQuantized, this.lowWatermark.quantizedForPunctuationGeneration);
if ((ulong)(value.SyncTime - prevPunctuation) >= this.punctuationGenerationPeriod)
{
// SyncTime is sufficiently high to generate a new punctuation, but first snap it to the nearest generationPeriod boundary
var punctuationTimeQuantized = value.SyncTime.SnapToLeftBoundary((long)this.punctuationGenerationPeriod);
#if DEBUG
Debug.Assert(punctuationTimeQuantized >= LastEventTime(value.PartitionKey), "Bug in punctuation quantization logic");
#endif
OnPunctuation(value.CreatePunctuation(punctuationTimeQuantized));
}
}
}
this.currentBatch.Add(value.SyncTime, value.OtherTime, new PartitionKey<TKey>(value.PartitionKey), value.Payload);
if (this.currentBatch.Count == Config.DataBatchSize)
{
if (this.flushPolicy == PartitionedFlushPolicy.FlushOnBatchBoundary) OnFlush();
else FlushContents();
}
}
UpdateCurrentTime(value.PartitionKey, value.SyncTime);
}