in sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs [720:1045]
internal virtual PartitionProcessor CreatePartitionProcessor(TPartition partition,
CancellationTokenSource cancellationSource,
EventPosition? startingPositionOverride = null)
{
cancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>();
var consumer = default(TransportConsumer);
// If the tracking of the last enqueued event properties was requested, then read the
// properties from the active consumer, which can change during processing in the event of
// error scenarios.
LastEnqueuedEventProperties readLastEnqueuedEventProperties()
{
// This is not an expected scenario; the guard exists to prevent a race condition that is
// unlikely, but possible, when partition processing is being stopped or consumer creation
// outright failed.
if ((consumer == null) || (consumer.IsClosed))
{
Argument.AssertNotClosed(true, Resources.ClientNeededForThisInformationNotAvailable);
}
return new LastEnqueuedEventProperties(consumer.LastReceivedEvent);
}
async Task<EventPosition> initializeStartingPositionAsync(TPartition initializePartition,
EventPosition? overridePosition,
CancellationToken initializeCancellation)
{
// Determine the position to start processing from; this will occur during partition initialization normally, but may be superseded
// if an override was passed. In the event that initialization is run and encounters an exception, it takes responsibility for firing
// the error handler.
var (startingPosition, checkpoint) = overridePosition switch
{
_ when overridePosition.HasValue => (overridePosition.Value, null),
_ => await InitializePartitionForProcessingAsync(initializePartition, initializeCancellation).ConfigureAwait(false)
};
var checkpointUsed = (checkpoint != null);
var checkpointLastModified = checkpointUsed ? checkpoint.LastModified : null;
var checkpointAuthor = checkpointUsed ? checkpoint.ClientIdentifier : null;
Logger.EventProcessorPartitionProcessingEventPositionDetermined(Identifier, EventHubName, ConsumerGroup, initializePartition.PartitionId, startingPosition.ToString(), checkpointUsed, checkpointAuthor, checkpointLastModified);
return startingPosition;
}
// Define the routine to handle processing for the partition.
async Task performProcessing()
{
cancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>();
var connection = default(EventHubConnection);
var retryDelay = default(TimeSpan?);
var capturedException = default(Exception);
var eventBatch = default(IReadOnlyList<EventData>);
var lastEvent = default(EventData);
var failedAttemptCount = 0;
var resetConsumerCount = 0;
var startingPosition = await initializeStartingPositionAsync(partition, startingPositionOverride, cancellationSource.Token).ConfigureAwait(false);
// Create the connection to be used for spawning consumers; if the creation
// fails, then consider the processing task to be failed. The main processing
// loop will take responsibility for attempting to restart or relinquishing ownership.
try
{
connection = CreateConnection();
}
catch (Exception ex)
{
// The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
// for observing or surfacing exceptions that may occur in the handler.
_ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationReadEvents, CancellationToken.None);
Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
throw;
}
await using var connectionAwaiter = connection.ConfigureAwait(false);
// Continue processing the partition until cancellation is signaled or until the count of failed consumers is too great.
// Consumers which been consistently unable to receive and process events will be considered invalid and abandoned for a new consumer.
while ((!cancellationSource.IsCancellationRequested) && (resetConsumerCount <= MaximumConsumerRestartCount))
{
try
{
consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, $"P{ partition.PartitionId }-{ Identifier }", startingPosition, connection, Options, exclusive: true);
// Register for notification when the cancellation token is triggered. Attempt to close the consumer
// in response to force-close the link and short-circuit any receive operation that is blocked and
// awaiting timeout.
using var cancellationRegistration = cancellationSource.Token.Register(static state =>
{
// Because this is a best-effort attempt and exceptions are expected and not relevant to
// callers, use a fire-and-forget approach rather than awaiting.
_ = ((TransportConsumer)state).CloseAsync(CancellationToken.None);
}, consumer, useSynchronizationContext: false);
// Allow the core dispatching loop to apply an additional set of retries over any provided by the consumer
// itself, as a processor should be as resilient as possible and retain partition ownership if processing is
// able to make forward progress. If the retries are exhausted or a non-retriable exception occurs, the
// consumer will be considered invalid and potentially refreshed.
while (!cancellationSource.IsCancellationRequested)
{
var stopWatch = ValueStopwatch.StartNew();
var cycleStartTime = Logger.GetLogFormattedUtcNow();
try
{
eventBatch = await consumer.ReceiveAsync(EventBatchMaximumCount, Options.MaximumWaitTime, cancellationSource.Token).ConfigureAwait(false);
await ProcessEventBatchAsync(partition, eventBatch, Options.MaximumWaitTime.HasValue, cancellationSource.Token).ConfigureAwait(false);
// If the batch was successfully processed, capture the last event as the current starting position, in the
// event that the consumer becomes invalid and needs to be replaced.
lastEvent = (eventBatch != null && eventBatch.Count > 0) ? eventBatch[eventBatch.Count - 1] : null;
if (!string.IsNullOrEmpty(lastEvent?.OffsetString))
{
startingPosition = EventPosition.FromOffset(lastEvent.OffsetString, false);
}
// If event batches are successfully processed, then the need for forward progress is
// satisfied, and the failure counts should reset.
failedAttemptCount = 0;
resetConsumerCount = 0;
}
catch (TaskCanceledException) when (cancellationSource.IsCancellationRequested)
{
// Do not log; this is an expected scenario when partition processing is asked to stop.
throw;
}
catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected)
{
// This is an expected scenario that may occur when ownership changes; log the exception for tracking but
// do not surface it to the error handler.
Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
throw;
}
catch (Exception ex) when ((cancellationSource.IsCancellationRequested)
&& (((ex is EventHubsException ehEx) && (ehEx.Reason == EventHubsException.FailureReason.ClientClosed)) || (ex is ObjectDisposedException)))
{
// Do not log as an exception; this is an expected scenario when partition processing is asked to stop.
Logger.EventProcessorPartitionProcessingStopConsumerClose(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);
throw new TaskCanceledException();
}
catch (Exception ex) when (ex.IsNotType<DeveloperCodeException>())
{
// The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
// for observing or surfacing exceptions that may occur in the handler.
_ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationReadEvents, CancellationToken.None);
Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
retryDelay = RetryPolicy.CalculateRetryDelay(ex, ++failedAttemptCount);
if (!retryDelay.HasValue)
{
// If the exception should not be retried, then allow it to pass to the outer loop; this is intended
// to prevent being stuck in a corrupt state where the consumer is unable to read events.
throw;
}
await Task.Delay(retryDelay.Value, cancellationSource.Token).ConfigureAwait(false);
}
// Capture the end-to-end cycle information for the partition. This is intended to provide an
// all-up view for partition processing, showing when and how long it took for a batch be read and
// processed.
var startingSequence = default(string);
var endingSequence = default(string);
if (eventBatch != null && eventBatch.Count > 0)
{
startingSequence = eventBatch[0].SequenceNumber.ToString();
endingSequence = eventBatch[eventBatch.Count - 1].SequenceNumber.ToString();
}
Logger.EventProcessorPartitionProcessingCycleComplete(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, eventBatch?.Count ?? 0, startingSequence, endingSequence, cycleStartTime, Logger.GetLogFormattedUtcNow(), stopWatch.GetElapsedTime().TotalSeconds);
}
}
catch (TaskCanceledException)
{
throw;
}
catch (OperationCanceledException ex)
{
throw new TaskCanceledException(ex.Message, ex);
}
catch (DeveloperCodeException ex)
{
// If an exception leaked from developer-provided code, the processor lacks the proper level of context and
// insight to understand if it is safe to ignore and continue. Instead, this will be thrown and allowed to
// fault the partition processing task. To ensure visibility, log the error with an explicit call-out to identify
// it as originating in developer code.
var message = string.Format(CultureInfo.InvariantCulture, Resources.DeveloperCodeExceptionMessageMask, ex.InnerException.Message);
Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, message);
// Because this can be non-obvious to developers who are not capturing logs, also surface an exception to the error handler
// which offers guidance for error handling in developer code.
var handlerException = new EventHubsException(false, EventHubName, Resources.DeveloperCodeEventProcessingError, EventHubsException.FailureReason.GeneralError, ex.InnerException);
_ = InvokeOnProcessingErrorAsync(handlerException, partition, Resources.OperationEventProcessingDeveloperCode, CancellationToken.None);
// Discard the wrapper and propagate just the source exception from developer code.
ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
}
catch (FormatException ex)
{
// This is a special case that occurs when the starting position is invalid due to a legacy checkpoint format
// after a GeoDR fail over. There is no way to recover checkpoint data in this scenario, the checkpoint will
// need to be removed or ignored and the default position used. To avoid requiring callers to manually remove
// checkpoints, the processor will invalidate the checkpoint automatically to allow processing to continue
// for a retry.
Logger.EventProcessorPartitionLegacyCheckpointFormat(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);
// Because this can be non-obvious to developers who are not capturing logs, also surface an exception to the error handler
// to ensure that the checkpoint reset does not go unnoticed.
var handlerExceptionMessage = ex.Message ?? ex.InnerException?.Message;
var handlerException = new EventHubsException(false, EventHubName, handlerExceptionMessage, EventHubsException.FailureReason.GeneralError, ex.InnerException);
_ = InvokeOnProcessingErrorAsync(handlerException, partition, Resources.OperationReadEvents, CancellationToken.None);
// Attempt to clear checkpoint data. This will cause the next initialization attempt to use the default starting position.
try
{
await UpdateCheckpointAsync(partition.PartitionId, new CheckpointPosition(), cancellationSource.Token).ConfigureAwait(false);
}
catch (Exception clearEx)
{
// If we couldn't clear the checkpoint, capture the error and follow general error flow. This will fault the task
// and allow load balancing to reassign the partition or retry.
Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, clearEx.Message);
++resetConsumerCount;
capturedException = ex;
}
// Reinitialize the partition to force the full evaluation for determining the default position, including any custom logic
// in handlers. Initialization is responsible for its own logging and error handling; allow the exception to be bubbled.
startingPosition = await initializeStartingPositionAsync(partition, startingPositionOverride, cancellationSource.Token).ConfigureAwait(false);
}
catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected)
{
// If the partition was stolen, the consumer should not be recreated as that would reassert ownership
// and potentially interfere with the new owner. Instead, the exception should be surfaced to fault
// the processor task and allow the next load balancing cycle to make the decision on whether processing
// should be restarted or the new owner acknowledged.
ReportPartitionStolen(partition.PartitionId);
throw;
}
catch (Exception ex) when (ex.IsFatalException())
{
throw;
}
catch (Exception ex)
{
Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
++resetConsumerCount;
capturedException = ex;
}
finally
{
try
{
if (consumer != null)
{
await consumer.CloseAsync(CancellationToken.None).ConfigureAwait(false);
}
}
catch (Exception ex)
{
Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
// Do not bubble the exception, as the consumer is being refreshed; failure to close this consumer is non-fatal.
}
}
}
// If there was an exception captured, then surface it. Otherwise signal that cancellation took place.
if (capturedException != null)
{
ExceptionDispatchInfo.Capture(capturedException).Throw();
}
throw new TaskCanceledException();
}
// Start processing in the background and return the processor metadata. Since the task is
// expected to run continually until the processor is stopped or ownership changes, mark it as
// long-running. Other than the long-running designation, the options used intentionally match
// the recommended defaults used by Task.Run.
//
// For more context, see: https://devblogs.microsoft.com/pfxteam/task-run-vs-task-factory-startnew/
return new PartitionProcessor
(
Task.Factory.StartNew(performProcessing, cancellationSource.Token, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(),
partition,
readLastEnqueuedEventProperties,
cancellationSource
);
}