sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/EventHubPartitionPump.cs (155 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. namespace Microsoft.Azure.EventHubs.Processor { using System; using System.Collections.Generic; using System.Threading.Tasks; class EventHubPartitionPump : PartitionPump { EventHubClient eventHubClient; PartitionReceiver partitionReceiver; PartitionReceiveHandler partitionReceiveHandler; public EventHubPartitionPump(EventProcessorHost host, Lease lease) : base(host, lease) { } protected override async Task OnOpenAsync() { bool openedOK = false; int retryCount = 0; Exception lastException = null; do { try { await OpenClientsAsync().ConfigureAwait(false); openedOK = true; } catch (Exception e) { lastException = e; ProcessorEventSource.Log.PartitionPumpWarning( this.Host.HostName, this.PartitionContext.PartitionId, "Failure creating client or receiver, retrying", e.ToString()); // Don't retry if we already lost the lease. if (e is ReceiverDisconnectedException) { break; } retryCount++; } } while (!openedOK && (retryCount < 5)); if (!openedOK) { // IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here, // so we can report this error to it instead of the general error handler. await this.Processor.ProcessErrorAsync(this.PartitionContext, lastException).ConfigureAwait(false); this.PumpStatus = PartitionPumpStatus.OpenFailed; } if (this.PumpStatus == PartitionPumpStatus.Opening) { this.partitionReceiveHandler = new PartitionReceiveHandler(this); // IEventProcessor.OnOpen is called from the base PartitionPump and must have returned in order for execution to reach here, // meaning it is safe to set the handler and start calling IEventProcessor.OnEvents. // Set the status to running before setting the client handler, so the IEventProcessor.OnEvents can never race and see status != running. this.PumpStatus = PartitionPumpStatus.Running; this.partitionReceiver.SetReceiveHandler( this.partitionReceiveHandler, this.Host.EventProcessorOptions.InvokeProcessorAfterReceiveTimeout); } if (this.PumpStatus == PartitionPumpStatus.OpenFailed) { this.PumpStatus = PartitionPumpStatus.Closing; await this.CleanUpClientsAsync().ConfigureAwait(false); this.PumpStatus = PartitionPumpStatus.Closed; } } async Task OpenClientsAsync() // throws EventHubsException, IOException, InterruptedException, ExecutionException { // Create new clients EventPosition eventPosition = await this.PartitionContext.GetInitialOffsetAsync().ConfigureAwait(false); long epoch = this.Lease.Epoch; ProcessorEventSource.Log.PartitionPumpCreateClientsStart(this.Host.HostName, this.PartitionContext.PartitionId, epoch, $"Offset:{eventPosition.Offset}, SequenceNumber:{eventPosition.SequenceNumber}, DateTime:{eventPosition.EnqueuedTimeUtc}"); this.eventHubClient = this.Host.CreateEventHubClient(); this.eventHubClient.WebProxy = this.Host.EventProcessorOptions.WebProxy; var receiverOptions = new ReceiverOptions() { // Enable receiver metrics? EnableReceiverRuntimeMetric = this.Host.EventProcessorOptions.EnableReceiverRuntimeMetric, // Use host name as the identifier for debugging purpose // Shorten host name if name is longer than max allowed length. Identifier = this.Host.HostName.Length > ClientConstants.MaxReceiverIdentifierLength ? this.Host.HostName.Substring(0, ClientConstants.MaxReceiverIdentifierLength) : this.Host.HostName }; // Create new receiver and set options this.partitionReceiver = this.eventHubClient.CreateEpochReceiver( this.PartitionContext.ConsumerGroupName, this.PartitionContext.PartitionId, eventPosition, epoch, receiverOptions); this.partitionReceiver.PrefetchCount = this.Host.EventProcessorOptions.PrefetchCount; ProcessorEventSource.Log.PartitionPumpCreateClientsStop(this.Host.HostName, this.PartitionContext.PartitionId); } async Task CleanUpClientsAsync() // swallows all exceptions { if (this.partitionReceiver != null) { // Taking the lock means that there is no ProcessEventsAsync call in progress. Task closeTask; using (await this.ProcessingAsyncLock.LockAsync().ConfigureAwait(false)) { // Calling PartitionReceiver.CloseAsync will gracefully close the IPartitionReceiveHandler we have installed. ProcessorEventSource.Log.PartitionPumpInfo(this.Host.HostName, this.PartitionContext.PartitionId, "Closing PartitionReceiver"); closeTask = this.partitionReceiver.CloseAsync(); } await closeTask.ConfigureAwait(false); this.partitionReceiver = null; } if (this.eventHubClient != null) { ProcessorEventSource.Log.PartitionPumpInfo(this.Host.HostName, this.PartitionContext.PartitionId, "Closing EventHubClient"); await this.eventHubClient.CloseAsync().ConfigureAwait(false); this.eventHubClient = null; } } protected override Task OnClosingAsync(CloseReason reason) { // Close the EH clients. Errors are swallowed, nothing we could do about them anyway. return CleanUpClientsAsync(); } class PartitionReceiveHandler : IPartitionReceiveHandler { readonly EventHubPartitionPump eventHubPartitionPump; public PartitionReceiveHandler(EventHubPartitionPump eventHubPartitionPump) { this.eventHubPartitionPump = eventHubPartitionPump; this.MaxBatchSize = eventHubPartitionPump.Host.EventProcessorOptions.MaxBatchSize; } public int MaxBatchSize { get; set; } public Task ProcessEventsAsync(IEnumerable<EventData> events) { // This method is called on the thread that the EH client uses to run the pump. // There is one pump per EventHubClient. Since each PartitionPump creates a new EventHubClient, // using that thread to call OnEvents does no harm. Even if OnEvents is slow, the pump will // get control back each time OnEvents returns, and be able to receive a new batch of messages // with which to make the next OnEvents call. The pump gains nothing by running faster than OnEvents. return this.eventHubPartitionPump.ProcessEventsAsync(events); } public async Task ProcessErrorAsync(Exception error) { bool faultPump; if (error == null) { error = new Exception("No error info supplied by EventHub client"); } if (error is ReceiverDisconnectedException) { // Trace as warning since ReceiverDisconnectedException is part of lease stealing logic. ProcessorEventSource.Log.PartitionPumpWarning( this.eventHubPartitionPump.Host.HostName, this.eventHubPartitionPump.PartitionContext.PartitionId, "EventHub client disconnected, probably another host took the partition", error.Message); // Shutdown the message pump when receiver is disconnected. faultPump = true; } else { ProcessorEventSource.Log.PartitionPumpError( this.eventHubPartitionPump.Host.HostName, this.eventHubPartitionPump.PartitionContext.PartitionId, "EventHub client error:", error.ToString()); // No need to fault the pump, we expect receiver to recover on its own. faultPump = false; } try { // We would like to deliver all errors in the pump to error handler. await this.eventHubPartitionPump.ProcessErrorAsync(error).ConfigureAwait(false); } finally { // Fault pump only when needed. if (faultPump) { this.eventHubPartitionPump.PumpStatus = PartitionPumpStatus.Errored; } } } } } }