sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionPump.cs (156 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.EventHubs.Processor
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
abstract class PartitionPump
{
CancellationTokenSource cancellationTokenSource;
protected PartitionPump(EventProcessorHost host, Lease lease)
{
this.Host = host;
this.Lease = lease;
this.ProcessingAsyncLock = new AsyncLock();
this.PumpStatus = PartitionPumpStatus.Uninitialized;
}
protected EventProcessorHost Host { get; }
protected internal Lease Lease { get; }
protected IEventProcessor Processor { get; private set; }
protected PartitionContext PartitionContext { get; private set; }
protected AsyncLock ProcessingAsyncLock { get; }
internal void SetLeaseToken(string newToken)
{
this.PartitionContext.Lease.Token = newToken;
}
public async Task OpenAsync()
{
this.PumpStatus = PartitionPumpStatus.Opening;
this.cancellationTokenSource = new CancellationTokenSource();
this.PartitionContext = new PartitionContext(
this.Host,
this.Lease.PartitionId,
this.Host.EventHubPath,
this.Host.ConsumerGroupName,
this.cancellationTokenSource.Token);
this.PartitionContext.Lease = this.Lease;
if (this.PumpStatus == PartitionPumpStatus.Opening)
{
string action = EventProcessorHostActionStrings.CreatingEventProcessor;
try
{
this.Processor = this.Host.ProcessorFactory.CreateEventProcessor(this.PartitionContext);
action = EventProcessorHostActionStrings.OpeningEventProcessor;
ProcessorEventSource.Log.PartitionPumpInvokeProcessorOpenStart(this.Host.HostName, this.PartitionContext.PartitionId, this.Processor.GetType().ToString());
await this.Processor.OpenAsync(this.PartitionContext).ConfigureAwait(false);
ProcessorEventSource.Log.PartitionPumpInvokeProcessorOpenStop(this.Host.HostName, this.PartitionContext.PartitionId);
}
catch (Exception e)
{
// If the processor won't create or open, only thing we can do here is pass the buck.
// Null it out so we don't try to operate on it further.
ProcessorEventSource.Log.PartitionPumpError(this.Host.HostName, this.PartitionContext.PartitionId, "Failed " + action, e.ToString());
this.Processor = null;
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, action);
this.PumpStatus = PartitionPumpStatus.OpenFailed;
}
}
if (this.PumpStatus == PartitionPumpStatus.Opening)
{
await this.OnOpenAsync().ConfigureAwait(false);
}
}
protected abstract Task OnOpenAsync();
protected internal PartitionPumpStatus PumpStatus { get; protected set; }
internal bool IsClosing
{
get
{
return (this.PumpStatus == PartitionPumpStatus.Closing || this.PumpStatus == PartitionPumpStatus.Closed);
}
}
public async Task CloseAsync(CloseReason reason)
{
ProcessorEventSource.Log.PartitionPumpCloseStart(this.Host.HostName, this.PartitionContext.PartitionId, reason.ToString());
this.PumpStatus = PartitionPumpStatus.Closing;
try
{
this.cancellationTokenSource.Cancel();
await this.OnClosingAsync(reason).ConfigureAwait(false);
if (this.Processor != null)
{
using (await this.ProcessingAsyncLock.LockAsync().ConfigureAwait(false))
{
// When we take the lock, any existing ProcessEventsAsync call has finished.
// Because the client has been closed, there will not be any more
// calls to onEvents in the future. Therefore we can safely call CloseAsync.
ProcessorEventSource.Log.PartitionPumpInvokeProcessorCloseStart(this.Host.HostName, this.PartitionContext.PartitionId, reason.ToString());
await this.Processor.CloseAsync(this.PartitionContext, reason).ConfigureAwait(false);
ProcessorEventSource.Log.PartitionPumpInvokeProcessorCloseStop(this.Host.HostName, this.PartitionContext.PartitionId);
}
}
}
catch (Exception e)
{
ProcessorEventSource.Log.PartitionPumpCloseError(this.Host.HostName, this.PartitionContext.PartitionId, e.ToString());
// If closing the processor has failed, the state of the processor is suspect.
// Report the failure to the general error handler instead.
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, "Closing Event Processor");
}
finally
{
// Release the lease regardless of result from pump's close call above.
// Increase the chance of a healthy host grabbing the lease here.
if (reason != CloseReason.LeaseLost)
{
// Since this pump is dead, release the lease.
try
{
await this.Host.LeaseManager.ReleaseLeaseAsync(this.PartitionContext.Lease).ConfigureAwait(false);
}
catch (Exception e)
{
// Log and ignore any failure since expired lease will be picked by another host.
this.Host.EventProcessorOptions.NotifyOfException(this.Host.HostName, this.PartitionContext.PartitionId, e, EventProcessorHostActionStrings.ReleasingLease);
}
}
}
this.PumpStatus = PartitionPumpStatus.Closed;
ProcessorEventSource.Log.PartitionPumpCloseStop(this.Host.HostName, this.PartitionContext.PartitionId);
}
protected abstract Task OnClosingAsync(CloseReason reason);
protected async Task ProcessEventsAsync(IEnumerable<EventData> events)
{
if (events == null)
{
events = Enumerable.Empty<EventData>();
}
// Synchronize to serialize calls to the processor.
// The handler is not installed until after OpenAsync returns, so ProcessEventsAsync cannot conflict with OpenAsync.
// There could be a conflict between ProcessEventsAsync and CloseAsync, however. All calls to CloseAsync are
// protected by synchronizing too.
using (await this.ProcessingAsyncLock.LockAsync().ConfigureAwait(false))
{
ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStart(this.Host.HostName,
this.PartitionContext.PartitionId, events?.Count() ?? 0);
try
{
EventData last = events?.LastOrDefault();
if (last != null)
{
ProcessorEventSource.Log.PartitionPumpInfo(
this.Host.HostName,
this.PartitionContext.PartitionId,
"Updating offset in partition context with end of batch " + last.SystemProperties.Offset + "/" + last.SystemProperties.SequenceNumber);
this.PartitionContext.SetOffsetAndSequenceNumber(last);
if (this.Host.EventProcessorOptions.EnableReceiverRuntimeMetric)
{
this.PartitionContext.RuntimeInformation.Update(last);
}
}
await this.Processor.ProcessEventsAsync(this.PartitionContext, events).ConfigureAwait(false);
}
catch (Exception e)
{
ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsError(this.Host.HostName, this.PartitionContext.PartitionId, e.ToString());
await this.ProcessErrorAsync(e).ConfigureAwait(false);
}
finally
{
ProcessorEventSource.Log.PartitionPumpInvokeProcessorEventsStop(this.Host.HostName, this.PartitionContext.PartitionId);
}
}
}
protected Task ProcessErrorAsync(Exception error)
{
return this.Processor.ProcessErrorAsync(this.PartitionContext, error);
}
}
}