internal virtual PartitionProcessor CreatePartitionProcessor()

in sdk/eventhub/Azure.Messaging.EventHubs/src/Primitives/EventProcessor.cs [720:1045]


        internal virtual PartitionProcessor CreatePartitionProcessor(TPartition partition,
                                                                     CancellationTokenSource cancellationSource,
                                                                     EventPosition? startingPositionOverride = null)
        {
            cancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>();
            var consumer = default(TransportConsumer);

            // If the tracking of the last enqueued event properties was requested, then read the
            // properties from the active consumer, which can change during processing in the event of
            // error scenarios.

            LastEnqueuedEventProperties readLastEnqueuedEventProperties()
            {
                // This is not an expected scenario; the guard exists to prevent a race condition that is
                // unlikely, but possible, when partition processing is being stopped or consumer creation
                // outright failed.

                if ((consumer == null) || (consumer.IsClosed))
                {
                    Argument.AssertNotClosed(true, Resources.ClientNeededForThisInformationNotAvailable);
                }

                return new LastEnqueuedEventProperties(consumer.LastReceivedEvent);
            }

            async Task<EventPosition> initializeStartingPositionAsync(TPartition initializePartition,
                                                                      EventPosition? overridePosition,
                                                                      CancellationToken initializeCancellation)
            {
                // Determine the position to start processing from; this will occur during partition initialization normally, but may be superseded
                // if an override was passed.  In the event that initialization is run and encounters an exception, it takes responsibility for firing
                // the error handler.

                var (startingPosition, checkpoint) = overridePosition switch
                {
                    _ when overridePosition.HasValue => (overridePosition.Value, null),
                    _ => await InitializePartitionForProcessingAsync(initializePartition, initializeCancellation).ConfigureAwait(false)
                };

                var checkpointUsed = (checkpoint != null);
                var checkpointLastModified = checkpointUsed ? checkpoint.LastModified : null;
                var checkpointAuthor = checkpointUsed ? checkpoint.ClientIdentifier : null;

                Logger.EventProcessorPartitionProcessingEventPositionDetermined(Identifier, EventHubName, ConsumerGroup, initializePartition.PartitionId, startingPosition.ToString(), checkpointUsed, checkpointAuthor, checkpointLastModified);
                return startingPosition;
            }

            // Define the routine to handle processing for the partition.

            async Task performProcessing()
            {
                cancellationSource.Token.ThrowIfCancellationRequested<TaskCanceledException>();

                var connection = default(EventHubConnection);
                var retryDelay = default(TimeSpan?);
                var capturedException = default(Exception);
                var eventBatch = default(IReadOnlyList<EventData>);
                var lastEvent = default(EventData);
                var failedAttemptCount = 0;
                var resetConsumerCount = 0;

                var startingPosition = await initializeStartingPositionAsync(partition, startingPositionOverride, cancellationSource.Token).ConfigureAwait(false);

                // Create the connection to be used for spawning consumers; if the creation
                // fails, then consider the processing task to be failed.  The main processing
                // loop will take responsibility for attempting to restart or relinquishing ownership.

                try
                {
                    connection = CreateConnection();
                }
                catch (Exception ex)
                {
                    // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
                    // for observing or surfacing exceptions that may occur in the handler.

                    _ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationReadEvents, CancellationToken.None);
                    Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);

                    throw;
                }

                await using var connectionAwaiter = connection.ConfigureAwait(false);

                // Continue processing the partition until cancellation is signaled or until the count of failed consumers is too great.
                // Consumers which been consistently unable to receive and process events will be considered invalid and abandoned for a new consumer.

                while ((!cancellationSource.IsCancellationRequested) && (resetConsumerCount <= MaximumConsumerRestartCount))
                {
                    try
                    {
                        consumer = CreateConsumer(ConsumerGroup, partition.PartitionId, $"P{ partition.PartitionId }-{ Identifier }", startingPosition, connection, Options, exclusive: true);

                        // Register for notification when the cancellation token is triggered.  Attempt to close the consumer
                        // in response to force-close the link and short-circuit any receive operation that is blocked and
                        // awaiting timeout.

                        using var cancellationRegistration = cancellationSource.Token.Register(static state =>
                        {
                            // Because this is a best-effort attempt and exceptions are expected and not relevant to
                            // callers, use a fire-and-forget approach rather than awaiting.

                            _ = ((TransportConsumer)state).CloseAsync(CancellationToken.None);
                        }, consumer, useSynchronizationContext: false);

                        // Allow the core dispatching loop to apply an additional set of retries over any provided by the consumer
                        // itself, as a processor should be as resilient as possible and retain partition ownership if processing is
                        // able to make forward progress.  If the retries are exhausted or a non-retriable exception occurs, the
                        // consumer will be considered invalid and potentially refreshed.

                        while (!cancellationSource.IsCancellationRequested)
                        {
                            var stopWatch = ValueStopwatch.StartNew();
                            var cycleStartTime = Logger.GetLogFormattedUtcNow();

                            try
                            {
                                eventBatch = await consumer.ReceiveAsync(EventBatchMaximumCount, Options.MaximumWaitTime, cancellationSource.Token).ConfigureAwait(false);
                                await ProcessEventBatchAsync(partition, eventBatch, Options.MaximumWaitTime.HasValue, cancellationSource.Token).ConfigureAwait(false);

                                // If the batch was successfully processed, capture the last event as the current starting position, in the
                                // event that the consumer becomes invalid and needs to be replaced.

                                lastEvent = (eventBatch != null && eventBatch.Count > 0) ? eventBatch[eventBatch.Count - 1] : null;

                                if (!string.IsNullOrEmpty(lastEvent?.OffsetString))
                                {
                                    startingPosition = EventPosition.FromOffset(lastEvent.OffsetString, false);
                                }

                                // If event batches are successfully processed, then the need for forward progress is
                                // satisfied, and the failure counts should reset.

                                failedAttemptCount = 0;
                                resetConsumerCount = 0;
                            }
                            catch (TaskCanceledException) when (cancellationSource.IsCancellationRequested)
                            {
                                // Do not log; this is an expected scenario when partition processing is asked to stop.

                                throw;
                            }
                            catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected)
                            {
                                // This is an expected scenario that may occur when ownership changes; log the exception for tracking but
                                // do not surface it to the error handler.

                                Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
                                throw;
                            }
                            catch (Exception ex) when ((cancellationSource.IsCancellationRequested)
                                && (((ex is EventHubsException ehEx) && (ehEx.Reason == EventHubsException.FailureReason.ClientClosed)) || (ex is ObjectDisposedException)))
                            {
                                // Do not log as an exception; this is an expected scenario when partition processing is asked to stop.

                                Logger.EventProcessorPartitionProcessingStopConsumerClose(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);
                                throw new TaskCanceledException();
                            }
                            catch (Exception ex) when (ex.IsNotType<DeveloperCodeException>())
                            {
                                // The error handler is invoked as a fire-and-forget task; the processor does not assume responsibility
                                // for observing or surfacing exceptions that may occur in the handler.

                                _ = InvokeOnProcessingErrorAsync(ex, partition, Resources.OperationReadEvents, CancellationToken.None);

                                Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);
                                retryDelay = RetryPolicy.CalculateRetryDelay(ex, ++failedAttemptCount);

                                if (!retryDelay.HasValue)
                                {
                                    // If the exception should not be retried, then allow it to pass to the outer loop; this is intended
                                    // to prevent being stuck in a corrupt state where the consumer is unable to read events.

                                    throw;
                                }

                                await Task.Delay(retryDelay.Value, cancellationSource.Token).ConfigureAwait(false);
                            }

                            // Capture the end-to-end cycle information for the partition.  This is intended to provide an
                            // all-up view for partition processing, showing when and how long it took for a batch be read and
                            // processed.

                            var startingSequence = default(string);
                            var endingSequence = default(string);

                            if (eventBatch != null && eventBatch.Count > 0)
                            {
                                startingSequence = eventBatch[0].SequenceNumber.ToString();
                                endingSequence = eventBatch[eventBatch.Count - 1].SequenceNumber.ToString();
                            }

                            Logger.EventProcessorPartitionProcessingCycleComplete(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, eventBatch?.Count ?? 0, startingSequence, endingSequence, cycleStartTime, Logger.GetLogFormattedUtcNow(), stopWatch.GetElapsedTime().TotalSeconds);
                        }
                    }
                    catch (TaskCanceledException)
                    {
                        throw;
                    }
                    catch (OperationCanceledException ex)
                    {
                        throw new TaskCanceledException(ex.Message, ex);
                    }
                    catch (DeveloperCodeException ex)
                    {
                        // If an exception leaked from developer-provided code, the processor lacks the proper level of context and
                        // insight to understand if it is safe to ignore and continue.  Instead, this will be thrown and allowed to
                        // fault the partition processing task.  To ensure visibility, log the error with an explicit call-out to identify
                        // it as originating in developer code.

                        var message = string.Format(CultureInfo.InvariantCulture, Resources.DeveloperCodeExceptionMessageMask, ex.InnerException.Message);
                        Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, message);

                        // Because this can be non-obvious to developers who are not capturing logs, also surface an exception to the error handler
                        // which offers guidance for error handling in developer code.

                        var handlerException = new EventHubsException(false, EventHubName, Resources.DeveloperCodeEventProcessingError, EventHubsException.FailureReason.GeneralError, ex.InnerException);
                        _  = InvokeOnProcessingErrorAsync(handlerException, partition, Resources.OperationEventProcessingDeveloperCode, CancellationToken.None);

                        // Discard the wrapper and propagate just the source exception from developer code.

                        ExceptionDispatchInfo.Capture(ex.InnerException).Throw();
                    }
                    catch (FormatException ex)
                    {
                        // This is a special case that occurs when the starting position is invalid due to a legacy checkpoint format
                        // after a GeoDR fail over. There is no way to recover checkpoint data in this scenario, the checkpoint will
                        // need to be removed or ignored and the default position used.  To avoid requiring callers to manually remove
                        // checkpoints, the processor will invalidate the checkpoint automatically to allow processing to continue
                        // for a retry.

                        Logger.EventProcessorPartitionLegacyCheckpointFormat(partition.PartitionId, Identifier, EventHubName, ConsumerGroup);

                        // Because this can be non-obvious to developers who are not capturing logs, also surface an exception to the error handler
                        // to ensure that the checkpoint reset does not go unnoticed.

                        var handlerExceptionMessage = ex.Message ?? ex.InnerException?.Message;
                        var handlerException = new EventHubsException(false, EventHubName, handlerExceptionMessage, EventHubsException.FailureReason.GeneralError, ex.InnerException);
                        _ = InvokeOnProcessingErrorAsync(handlerException, partition, Resources.OperationReadEvents, CancellationToken.None);

                        // Attempt to clear checkpoint data. This will cause the next initialization attempt to use the default starting position.

                        try
                        {
                            await UpdateCheckpointAsync(partition.PartitionId, new CheckpointPosition(), cancellationSource.Token).ConfigureAwait(false);
                        }
                        catch (Exception clearEx)
                        {
                            // If we couldn't clear the checkpoint, capture the error and follow general error flow.  This will fault the task
                            // and allow load balancing to reassign the partition or retry.

                            Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, clearEx.Message);

                            ++resetConsumerCount;
                            capturedException = ex;
                        }

                        // Reinitialize the partition to force the full evaluation for determining the default position, including any custom logic
                        // in handlers.  Initialization is responsible for its own logging and error handling; allow the exception to be bubbled.

                        startingPosition = await initializeStartingPositionAsync(partition, startingPositionOverride, cancellationSource.Token).ConfigureAwait(false);
                    }
                    catch (EventHubsException ex) when (ex.Reason == EventHubsException.FailureReason.ConsumerDisconnected)
                    {
                        // If the partition was stolen, the consumer should not be recreated as that would reassert ownership
                        // and potentially interfere with the new owner.  Instead, the exception should be surfaced to fault
                        // the processor task and allow the next load balancing cycle to make the decision on whether processing
                        // should be restarted or the new owner acknowledged.

                        ReportPartitionStolen(partition.PartitionId);
                        throw;
                    }
                    catch (Exception ex) when (ex.IsFatalException())
                    {
                        throw;
                    }
                    catch (Exception ex)
                    {
                        Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);

                        ++resetConsumerCount;
                        capturedException = ex;
                    }
                    finally
                    {
                        try
                        {
                            if (consumer != null)
                            {
                                await consumer.CloseAsync(CancellationToken.None).ConfigureAwait(false);
                            }
                        }
                        catch (Exception ex)
                        {
                            Logger.EventProcessorPartitionProcessingError(partition.PartitionId, Identifier, EventHubName, ConsumerGroup, ex.Message);

                            // Do not bubble the exception, as the consumer is being refreshed; failure to close this consumer is non-fatal.
                        }
                    }
                }

                // If there was an exception captured, then surface it.  Otherwise signal that cancellation took place.

                if (capturedException != null)
                {
                    ExceptionDispatchInfo.Capture(capturedException).Throw();
                }

                throw new TaskCanceledException();
            }

            // Start processing in the background and return the processor metadata.  Since the task is
            // expected to run continually until the processor is stopped or ownership changes, mark it as
            // long-running.  Other than the long-running designation, the options used intentionally match
            // the recommended defaults used by Task.Run.
            //
            // For more context, see: https://devblogs.microsoft.com/pfxteam/task-run-vs-task-factory-startnew/

            return new PartitionProcessor
            (
                Task.Factory.StartNew(performProcessing, cancellationSource.Token, TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach, TaskScheduler.Default).Unwrap(),
                partition,
                readLastEnqueuedEventProperties,
                cancellationSource
            );
        }