sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/EventProcessorHost.cs (315 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
namespace Microsoft.Azure.EventHubs.Processor
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.EventHubs.Primitives;
using Microsoft.Azure.Storage;
/// <summary>
/// Represents a host for processing Event Hubs event data.
/// </summary>
public sealed class EventProcessorHost
{
// A processor host will work on either the token provider or the connection string.
readonly ITokenProvider tokenProvider;
string eventHubConnectionString;
/// <summary>
/// Create a new host to process events from an Event Hub.
///
/// <para>Since Event Hubs are frequently used for scale-out, high-traffic scenarios, generally there will
/// be only one host per process, and the processes will be run on separate machines. However, it is
/// supported to run multiple hosts on one machine, or even within one process, if throughput is not
/// a concern.</para>
///
/// This overload of the constructor uses the default, built-in lease and checkpoint managers. The
/// Azure Storage account specified by the storageConnectionString parameter is used by the built-in
/// managers to record leases and checkpoints.
/// </summary>
/// <param name="eventHubPath">The name of the EventHub.</param>
/// <param name="consumerGroupName">The name of the consumer group within the Event Hub.</param>
/// <param name="eventHubConnectionString">Connection string for the Event Hub to receive from.</param>
/// <param name="storageConnectionString">Connection string to Azure Storage account used for leases and checkpointing.</param>
/// <param name="leaseContainerName">Azure Storage container name for use by built-in lease and checkpoint manager.</param>
public EventProcessorHost(
string eventHubPath,
string consumerGroupName,
string eventHubConnectionString,
string storageConnectionString,
string leaseContainerName)
: this(EventProcessorHost.CreateHostName(null),
eventHubPath,
consumerGroupName,
eventHubConnectionString,
storageConnectionString,
leaseContainerName,
null)
{
}
/// <summary>
/// Create a new host to process events from an Event Hub.
///
/// <para>This overload of the constructor uses the default, built-in lease and checkpoint managers.</para>
/// </summary>
/// <param name="hostName">Name of the processor host. MUST BE UNIQUE. Strongly recommend including a Guid to ensure uniqueness.</param>
/// <param name="eventHubPath">The name of the EventHub.</param>
/// <param name="consumerGroupName">The name of the consumer group within the Event Hub.</param>
/// <param name="eventHubConnectionString">Connection string for the Event Hub to receive from.</param>
/// <param name="storageConnectionString">Connection string to Azure Storage account used for leases and checkpointing.</param>
/// <param name="leaseContainerName">Azure Storage container name for use by built-in lease and checkpoint manager.</param>
/// <param name="storageBlobPrefix">Prefix used when naming blobs within the storage container.</param>
public EventProcessorHost(
string hostName,
string eventHubPath,
string consumerGroupName,
string eventHubConnectionString,
string storageConnectionString,
string leaseContainerName,
string storageBlobPrefix = null)
: this(hostName,
eventHubPath,
consumerGroupName,
eventHubConnectionString,
new AzureStorageCheckpointLeaseManager(storageConnectionString, leaseContainerName, storageBlobPrefix))
{
}
/// <summary>
/// Create a new host to process events from an Event Hub.
///
/// <para>This overload of the constructor allows maximum flexibility.
/// This one allows the caller to specify the name of the processor host as well.
/// The overload also allows the caller to provide their own lease and checkpoint managers to replace the built-in
/// ones based on Azure Storage.</para>
/// </summary>
/// <param name="hostName">Name of the processor host. MUST BE UNIQUE. Strongly recommend including a Guid to ensure uniqueness.</param>
/// <param name="eventHubPath">The name of the EventHub.</param>
/// <param name="consumerGroupName">The name of the consumer group within the Event Hub.</param>
/// <param name="eventHubConnectionString">Connection string for the Event Hub to receive from.</param>
/// <param name="checkpointManager">Object implementing ICheckpointManager which handles partition checkpointing.</param>
/// <param name="leaseManager">Object implementing ILeaseManager which handles leases for partitions.</param>
public EventProcessorHost(
string hostName,
string eventHubPath,
string consumerGroupName,
string eventHubConnectionString,
ICheckpointManager checkpointManager,
ILeaseManager leaseManager)
{
Guard.ArgumentNotNullOrWhiteSpace(nameof(hostName), hostName);
Guard.ArgumentNotNullOrWhiteSpace(nameof(consumerGroupName), consumerGroupName);
Guard.ArgumentNotNull(nameof(checkpointManager), checkpointManager);
Guard.ArgumentNotNull(nameof(leaseManager), leaseManager);
var csb = new EventHubsConnectionStringBuilder(eventHubConnectionString);
if (string.IsNullOrEmpty(eventHubPath))
{
// Entity path is expected in the connection string if not provided with eventHubPath parameter.
if (string.IsNullOrEmpty(csb.EntityPath))
{
throw new ArgumentException(nameof(eventHubConnectionString),
"Provide EventHub entity path either in eventHubPath parameter or in eventHubConnectionString.");
}
}
else
{
// Entity path should not conflict with connection string.
if (!string.IsNullOrEmpty(csb.EntityPath) &&
string.Compare(csb.EntityPath, eventHubPath, StringComparison.OrdinalIgnoreCase) != 0)
{
throw new ArgumentException(nameof(eventHubConnectionString),
"Provided EventHub path in eventHubPath parameter conflicts with the path in provided EventHubs connection string.");
}
csb.EntityPath = eventHubPath;
}
this.HostName = hostName;
this.EventHubPath = csb.EntityPath;
this.ConsumerGroupName = consumerGroupName;
this.eventHubConnectionString = csb.ToString();
this.CheckpointManager = checkpointManager;
this.LeaseManager = leaseManager;
this.TransportType = csb.TransportType;
this.OperationTimeout = csb.OperationTimeout;
this.EndpointAddress = csb.Endpoint;
this.PartitionManager = new PartitionManager(this);
ProcessorEventSource.Log.EventProcessorHostCreated(this.HostName, this.EventHubPath);
}
/// <summary>
/// Create a new host to process events from an Event Hub with provided <see cref="TokenProvider"/>
/// </summary>
/// <param name="endpointAddress">Fully qualified domain name for Event Hubs. Most likely, {yournamespace}.servicebus.windows.net</param>
/// <param name="eventHubPath">The name of the EventHub.</param>
/// <param name="consumerGroupName">The name of the consumer group within the Event Hub.</param>
/// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
/// <param name="cloudStorageAccount">Azure Storage account used for leases and checkpointing.</param>
/// <param name="leaseContainerName">Azure Storage container name for use by built-in lease and checkpoint manager.</param>
public EventProcessorHost(
Uri endpointAddress,
string eventHubPath,
string consumerGroupName,
ITokenProvider tokenProvider,
CloudStorageAccount cloudStorageAccount,
string leaseContainerName)
: this(EventProcessorHost.CreateHostName(null),
endpointAddress,
eventHubPath,
consumerGroupName,
tokenProvider,
cloudStorageAccount,
leaseContainerName)
{
}
/// <summary>
/// Create a new host to process events from an Event Hub with provided <see cref="TokenProvider"/>
/// </summary>
/// <param name="hostName">Name of the processor host. MUST BE UNIQUE. Strongly recommend including a Guid to ensure uniqueness.</param>
/// <param name="endpointAddress">Fully qualified domain name for Event Hubs. Most likely, {yournamespace}.servicebus.windows.net</param>
/// <param name="eventHubPath">The name of the EventHub.</param>
/// <param name="consumerGroupName">The name of the consumer group within the Event Hub.</param>
/// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
/// <param name="cloudStorageAccount">Azure Storage account used for leases and checkpointing.</param>
/// <param name="leaseContainerName">Azure Storage container name for use by built-in lease and checkpoint manager.</param>
/// <param name="storageBlobPrefix">Prefix used when naming blobs within the storage container.</param>
/// <param name="operationTimeout">Operation timeout for Event Hubs operations.</param>
/// <param name="transportType">Transport type on connection.</param>
public EventProcessorHost(
string hostName,
Uri endpointAddress,
string eventHubPath,
string consumerGroupName,
ITokenProvider tokenProvider,
CloudStorageAccount cloudStorageAccount,
string leaseContainerName,
string storageBlobPrefix = null,
TimeSpan? operationTimeout = null,
TransportType transportType = TransportType.Amqp)
: this(hostName,
endpointAddress,
eventHubPath,
consumerGroupName,
tokenProvider,
new AzureStorageCheckpointLeaseManager(cloudStorageAccount, leaseContainerName, storageBlobPrefix),
operationTimeout,
transportType)
{
}
/// <summary>
/// Create a new host to process events from an Event Hub with provided <see cref="TokenProvider"/>
/// </summary>
/// <param name="endpointAddress">Fully qualified domain name for Event Hubs. Most likely, {yournamespace}.servicebus.windows.net</param>
/// <param name="eventHubPath">The name of the EventHub.</param>
/// <param name="consumerGroupName">The name of the consumer group within the Event Hub.</param>
/// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
/// <param name="cloudStorageAccount">Azure Storage account used for leases and checkpointing.</param>
/// <param name="leaseContainerName">Azure Storage container name for use by built-in lease and checkpoint manager.</param>
/// <param name="storageBlobPrefix">Prefix used when naming blobs within the storage container.</param>
/// <param name="operationTimeout">Operation timeout for Event Hubs operations.</param>
/// <param name="transportType">Transport type on connection.</param>
public EventProcessorHost(
Uri endpointAddress,
string eventHubPath,
string consumerGroupName,
ITokenProvider tokenProvider,
CloudStorageAccount cloudStorageAccount,
string leaseContainerName,
string storageBlobPrefix = null,
TimeSpan? operationTimeout = null,
TransportType transportType = TransportType.Amqp)
: this(EventProcessorHost.CreateHostName(null),
endpointAddress,
eventHubPath,
consumerGroupName,
tokenProvider,
cloudStorageAccount,
leaseContainerName,
storageBlobPrefix,
operationTimeout,
transportType)
{
}
/// <summary>
/// Create a new host to process events from an Event Hub with provided <see cref="TokenProvider"/>
/// </summary>
/// <param name="hostName">Name of the processor host. MUST BE UNIQUE. Strongly recommend including a Guid to ensure uniqueness.</param>
/// <param name="endpointAddress">Fully qualified domain name for Event Hubs. Most likely, {yournamespace}.servicebus.windows.net</param>
/// <param name="eventHubPath">The name of the EventHub.</param>
/// <param name="consumerGroupName">The name of the consumer group within the Event Hub.</param>
/// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
/// <param name="checkpointManager">Object implementing ICheckpointManager which handles partition checkpointing.</param>
/// <param name="leaseManager">Object implementing ILeaseManager which handles leases for partitions.</param>
/// <param name="operationTimeout">Operation timeout for Event Hubs operations.</param>
/// <param name="transportType">Transport type on connection.</param>
public EventProcessorHost(
string hostName,
Uri endpointAddress,
string eventHubPath,
string consumerGroupName,
ITokenProvider tokenProvider,
ICheckpointManager checkpointManager,
ILeaseManager leaseManager,
TimeSpan? operationTimeout = null,
TransportType transportType = TransportType.Amqp)
{
Guard.ArgumentNotNullOrWhiteSpace(nameof(hostName), hostName);
Guard.ArgumentNotNull(nameof(endpointAddress), endpointAddress);
Guard.ArgumentNotNullOrWhiteSpace(nameof(eventHubPath), eventHubPath);
Guard.ArgumentNotNullOrWhiteSpace(nameof(consumerGroupName), consumerGroupName);
Guard.ArgumentNotNull(nameof(tokenProvider), tokenProvider);
Guard.ArgumentNotNull(nameof(checkpointManager), checkpointManager);
Guard.ArgumentNotNull(nameof(leaseManager), leaseManager);
this.HostName = hostName;
this.EndpointAddress = endpointAddress;
this.EventHubPath = eventHubPath;
this.ConsumerGroupName = consumerGroupName;
this.tokenProvider = tokenProvider;
this.CheckpointManager = checkpointManager;
this.LeaseManager = leaseManager;
this.TransportType = transportType;
this.OperationTimeout = operationTimeout ?? ClientConstants.DefaultOperationTimeout;
this.PartitionManager = new PartitionManager(this);
ProcessorEventSource.Log.EventProcessorHostCreated(this.HostName, this.EventHubPath);
}
// Using this intermediate constructor to create single combined manager to be used as
// both lease manager and checkpoint manager.
EventProcessorHost(
string hostName,
string eventHubPath,
string consumerGroupName,
string eventHubConnectionString,
AzureStorageCheckpointLeaseManager combinedManager)
: this(hostName,
eventHubPath,
consumerGroupName,
eventHubConnectionString,
combinedManager,
combinedManager)
{
}
// Using this intermediate constructor to create single combined manager to be used as
// both lease manager and checkpoint manager.
EventProcessorHost(
string hostName,
Uri endpointAddress,
string eventHubPath,
string consumerGroupName,
ITokenProvider tokenProvider,
AzureStorageCheckpointLeaseManager combinedManager,
TimeSpan? operationTimeout = null,
TransportType transportType = TransportType.Amqp)
: this(hostName,
endpointAddress,
eventHubPath,
consumerGroupName,
tokenProvider,
combinedManager,
combinedManager,
operationTimeout,
transportType)
{
}
/// <summary>
/// Returns processor host name.
/// If the processor host name was automatically generated, this is the only way to get it.
/// </summary>
public string HostName { get; }
/// <summary>
/// Gets the event hub path.
/// </summary>
public string EventHubPath { get; }
/// <summary>
/// Gets the consumer group name.
/// </summary>
public string ConsumerGroupName { get; }
/// <summary>
/// Gets the event endpoint URI.
/// </summary>
public Uri EndpointAddress { get; }
/// <summary>
/// Gets the transport type.
/// </summary>
public TransportType TransportType { get; }
/// <summary>
/// Gets the operation timeout.
/// </summary>
public TimeSpan OperationTimeout { get; internal set; }
/// <summary>Gets or sets the
/// <see cref="PartitionManagerOptions" /> instance used by the
/// <see cref="EventProcessorHost" /> object.</summary>
/// <value>The <see cref="PartitionManagerOptions" /> instance.</value>
public PartitionManagerOptions PartitionManagerOptions { get; set; }
// All of these accessors are for internal use only.
internal ICheckpointManager CheckpointManager { get; }
internal EventProcessorOptions EventProcessorOptions { get; private set; }
internal ILeaseManager LeaseManager { get; private set; }
internal IEventProcessorFactory ProcessorFactory { get; private set; }
internal PartitionManager PartitionManager { get; private set; }
/// <summary>
/// This registers <see cref="IEventProcessor"/> implementation with the host using <see cref="DefaultEventProcessorFactory{T}"/>.
/// This also starts the host and causes it to start participating in the partition distribution process.
/// </summary>
/// <typeparam name="T">Implementation of your application specific <see cref="IEventProcessor"/>.</typeparam>
/// <returns>A task to indicate EventProcessorHost instance is started.</returns>
public Task RegisterEventProcessorAsync<T>() where T : IEventProcessor, new()
{
return RegisterEventProcessorAsync<T>(EventProcessorOptions.DefaultOptions);
}
/// <summary>
/// This registers <see cref="IEventProcessor"/> implementation with the host using <see cref="DefaultEventProcessorFactory{T}"/>.
/// This also starts the host and causes it to start participating in the partition distribution process.
/// </summary>
/// <typeparam name="T">Implementation of your application specific <see cref="IEventProcessor"/>.</typeparam>
/// <param name="processorOptions"><see cref="EventProcessorOptions"/> to control various aspects of message pump created when ownership
/// is acquired for a particular partition of EventHub.</param>
/// <returns>A task to indicate EventProcessorHost instance is started.</returns>
public Task RegisterEventProcessorAsync<T>(EventProcessorOptions processorOptions) where T : IEventProcessor, new()
{
IEventProcessorFactory f = new DefaultEventProcessorFactory<T>();
return RegisterEventProcessorFactoryAsync(f, processorOptions);
}
/// <summary>
/// This registers <see cref="IEventProcessorFactory"/> implementation with the host which is used to create an instance of
/// <see cref="IEventProcessor"/> when it takes ownership of a partition. This also starts the host and causes it to start participating
/// in the partition distribution process.
/// </summary>
/// <param name="factory">Instance of <see cref="IEventProcessorFactory"/> implementation.</param>
/// <returns>A task to indicate EventProcessorHost instance is started.</returns>
public Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory factory)
{
var epo = EventProcessorOptions.DefaultOptions;
epo.ReceiveTimeout = TimeSpan.MinValue;
return RegisterEventProcessorFactoryAsync(factory, epo);
}
/// <summary>
/// This registers <see cref="IEventProcessorFactory"/> implementation with the host which is used to create an instance of
/// <see cref="IEventProcessor"/> when it takes ownership of a partition. This also starts the host and causes it to start participating
/// in the partition distribution process.
/// </summary>
/// <param name="factory">Instance of <see cref="IEventProcessorFactory"/> implementation.</param>
/// <param name="processorOptions"><see cref="EventProcessorOptions"/> to control various aspects of message pump created when ownership
/// is acquired for a particular partition of EventHub.</param>
/// <returns>A task to indicate EventProcessorHost instance is started.</returns>
public async Task RegisterEventProcessorFactoryAsync(IEventProcessorFactory factory, EventProcessorOptions processorOptions)
{
Guard.ArgumentNotNull(nameof(factory), factory);
Guard.ArgumentNotNull(nameof(processorOptions), processorOptions);
// Initialize partition manager options with default values if not already set by the client.
if (this.PartitionManagerOptions == null)
{
// Assign partition manager with default options.
this.PartitionManagerOptions = new PartitionManagerOptions();
}
ProcessorEventSource.Log.EventProcessorHostOpenStart(this.HostName, factory.GetType().ToString());
try
{
// Override operation timeout by receive timeout?
if (processorOptions.ReceiveTimeout > TimeSpan.MinValue)
{
this.OperationTimeout = processorOptions.ReceiveTimeout;
if (this.eventHubConnectionString != null)
{
var cbs = new EventHubsConnectionStringBuilder(this.eventHubConnectionString)
{
OperationTimeout = processorOptions.ReceiveTimeout
};
this.eventHubConnectionString = cbs.ToString();
}
}
// Initialize lease manager if this is an AzureStorageCheckpointLeaseManager
(this.LeaseManager as AzureStorageCheckpointLeaseManager)?.Initialize(this);
this.ProcessorFactory = factory;
this.EventProcessorOptions = processorOptions;
await this.PartitionManager.StartAsync().ConfigureAwait(false);
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostOpenError(this.HostName, e.ToString());
throw;
}
finally
{
ProcessorEventSource.Log.EventProcessorHostOpenStop(this.HostName);
}
}
/// <summary>
/// Stop processing events. Does not return until the shutdown is complete.
/// </summary>
/// <returns></returns>
public async Task UnregisterEventProcessorAsync() // throws InterruptedException, ExecutionException
{
ProcessorEventSource.Log.EventProcessorHostCloseStart(this.HostName);
try
{
await this.PartitionManager.StopAsync().ConfigureAwait(false);
}
catch (Exception e)
{
// Log the failure but nothing really to do about it.
ProcessorEventSource.Log.EventProcessorHostCloseError(this.HostName, e.ToString());
throw;
}
finally
{
ProcessorEventSource.Log.EventProcessorHostCloseStop(this.HostName);
}
}
/// <summary>
/// Convenience method for generating unique host names, safe to pass to the EventProcessorHost constructors
/// that take a hostName argument.
///
/// If a prefix is supplied, the constructed name begins with that string. If the prefix argument is null or
/// an empty string, the constructed name begins with "host". Then a dash '-' and a unique ID are appended to
/// create a unique name.
/// </summary>
/// <param name="prefix">String to use as the beginning of the name. If null or empty, a default is used.</param>
/// <returns>A unique host name to pass to EventProcessorHost constructors.</returns>
static string CreateHostName(string prefix)
{
if (string.IsNullOrEmpty(prefix))
{
prefix = "host";
}
return prefix + "-" + Guid.NewGuid();
}
internal EventHubClient CreateEventHubClient()
{
// Token provider already provided?
if (this.tokenProvider == null)
{
return EventHubClient.CreateFromConnectionString(this.eventHubConnectionString);
}
else
{
return EventHubClient.CreateWithTokenProvider(
this.EndpointAddress,
this.EventHubPath,
this.tokenProvider,
this.OperationTimeout,
this.TransportType);
}
}
}
}