in sdk/eventhub/Microsoft.Azure.EventHubs.Processor/src/PartitionManager.cs [211:434]
async Task RunLoopAsync(CancellationToken cancellationToken) // throws Exception, ExceptionWithAction
{
var loopStopwatch = new Stopwatch();
while (!cancellationToken.IsCancellationRequested)
{
// Mark start time so we can use the duration taken to calculate renew interval.
loopStopwatch.Restart();
ILeaseManager leaseManager = this.host.LeaseManager;
var allLeases = new ConcurrentDictionary<string, Lease>();
var leasesOwnedByOthers = new ConcurrentDictionary<string, Lease>();
// Inspect all leases.
// Acquire any expired leases.
// Renew any leases that currently belong to us.
IEnumerable<Lease> downloadedLeases;
var renewLeaseTasks = new List<Task>();
int ourLeaseCount = 0;
try
{
try
{
downloadedLeases = await leaseManager.GetAllLeasesAsync().ConfigureAwait(false);
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception during downloading leases", e.Message);
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.DownloadingLeases);
// Avoid tight spin if getallleases call keeps failing.
await Task.Delay(1000).ConfigureAwait(false);
continue;
}
// First things first, renew owned leases.
foreach (var lease in downloadedLeases)
{
var subjectLease = lease;
try
{
allLeases[subjectLease.PartitionId] = subjectLease;
if (subjectLease.Owner == this.host.HostName && !(await subjectLease.IsExpired().ConfigureAwait(false)))
{
ourLeaseCount++;
// Get lease from partition since we need the token at this point.
if (!this.partitionPumps.TryGetValue(subjectLease.PartitionId, out var capturedPump))
{
continue;
}
var capturedLease = capturedPump.Lease;
ProcessorEventSource.Log.PartitionPumpInfo(this.host.HostName, capturedLease.PartitionId, "Trying to renew lease.");
renewLeaseTasks.Add(leaseManager.RenewLeaseAsync(capturedLease).ContinueWith(renewResult =>
{
if (renewResult.IsFaulted)
{
// Might have failed due to intermittent error or lease-lost.
// Just log here, expired leases will be picked by same or another host anyway.
ProcessorEventSource.Log.PartitionPumpError(
this.host.HostName,
capturedLease.PartitionId,
"Failed to renew lease.",
renewResult.Exception?.Message);
this.host.EventProcessorOptions.NotifyOfException(
this.host.HostName,
capturedLease.PartitionId,
renewResult.Exception,
EventProcessorHostActionStrings.RenewingLease);
// Nullify the owner on the lease in case this host lost it.
// This helps to remove pump earlier reducing duplicate receives.
if (renewResult.Exception?.GetBaseException() is LeaseLostException)
{
allLeases[capturedLease.PartitionId].Owner = null;
}
}
}, cancellationToken));
}
else if (!await subjectLease.IsExpired().ConfigureAwait(false))
{
leasesOwnedByOthers[subjectLease.PartitionId] = subjectLease;
}
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Failure during checking lease.", e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.CheckingLeases);
}
}
// Wait until we are done with renewing our own leases here.
// In theory, this should never throw, error are logged and notified in the renew tasks.
await Task.WhenAll(renewLeaseTasks).ConfigureAwait(false);
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Lease renewal is finished.");
// Check any expired leases that we can grab here.
ourLeaseCount += await this.AcquireExpiredLeasesAsync(allLeases, leasesOwnedByOthers, ourLeaseCount, cancellationToken);
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Expired lease check is finished.");
// Grab more leases if available and needed for load balancing
if (leasesOwnedByOthers.Count > 0)
{
Lease stealThisLease = WhichLeaseToSteal(leasesOwnedByOthers.Values, ourLeaseCount);
// Don't attempt to steal the lease if current host has a pump for this partition id
// This is possible when current pump is in failed state due to lease moved to some other host.
if (stealThisLease != null && !this.partitionPumps.ContainsKey(stealThisLease.PartitionId))
{
try
{
// Get fresh content of lease subject to acquire.
var downloadedLease = await leaseManager.GetLeaseAsync(stealThisLease.PartitionId).ConfigureAwait(false);
allLeases[stealThisLease.PartitionId] = downloadedLease;
// Don't attempt to steal if lease is already expired.
// Expired leases are picked up by other hosts quickly.
// Don't attempt to steal if owner has changed from the calculation time to refresh time.
if (!await downloadedLease.IsExpired().ConfigureAwait(false)
&& downloadedLease.Owner == stealThisLease.Owner)
{
ProcessorEventSource.Log.PartitionPumpStealLeaseStart(this.host.HostName, downloadedLease.PartitionId);
if (await leaseManager.AcquireLeaseAsync(downloadedLease).ConfigureAwait(false))
{
// Succeeded in stealing lease
ProcessorEventSource.Log.PartitionPumpStealLeaseStop(this.host.HostName, downloadedLease.PartitionId);
ourLeaseCount++;
}
else
{
// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[stealThisLease.PartitionId].Owner = null;
ProcessorEventSource.Log.EventProcessorHostWarning(this.host.HostName,
"Failed to steal lease for partition " + downloadedLease.PartitionId, null);
}
}
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName,
"Exception during stealing lease for partition " + stealThisLease.PartitionId, e.ToString());
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName,
stealThisLease.PartitionId, e, EventProcessorHostActionStrings.StealingLease);
// Acquisition failed. Make sure we don't leave the lease as owned.
allLeases[stealThisLease.PartitionId].Owner = null;
}
}
}
// Update pump with new state of leases on owned partitions in parallel.
var createRemovePumpTasks = new List<Task>();
foreach (string partitionId in allLeases.Keys)
{
var subjectPartitionId = partitionId;
Lease updatedLease = allLeases[subjectPartitionId];
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, $"Lease on partition {updatedLease.PartitionId} owned by {updatedLease.Owner}");
if (updatedLease.Owner == this.host.HostName)
{
createRemovePumpTasks.Add(Task.Run(async () =>
{
try
{
await this.CheckAndAddPumpAsync(subjectPartitionId, updatedLease).ConfigureAwait(false);
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception during add pump on partition {subjectPartitionId}", e.Message);
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectPartitionId, e, EventProcessorHostActionStrings.PartitionPumpManagement);
}
}, cancellationToken));
}
else if (this.partitionPumps.ContainsKey(partitionId))
{
createRemovePumpTasks.Add(Task.Run(async () =>
{
try
{
await this.TryRemovePumpAsync(subjectPartitionId, CloseReason.LeaseLost).ConfigureAwait(false);
}
catch (Exception e)
{
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, $"Exception during remove pump on partition {subjectPartitionId}", e.Message);
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, subjectPartitionId, e, EventProcessorHostActionStrings.PartitionPumpManagement);
}
}, cancellationToken));
}
}
await Task.WhenAll(createRemovePumpTasks).ConfigureAwait(false);
ProcessorEventSource.Log.EventProcessorHostInfo(this.host.HostName, "Pump update is finished.");
}
catch (Exception e)
{
// TaskCancelledException is expected furing host unregister.
if (e is TaskCanceledException)
{
continue;
}
// Loop should not exit unless signalled via cancellation token. Log any failures and continue.
ProcessorEventSource.Log.EventProcessorHostError(this.host.HostName, "Exception from partition manager main loop, continuing", e.Message);
this.host.EventProcessorOptions.NotifyOfException(this.host.HostName, "N/A", e, EventProcessorHostActionStrings.PartitionPumpManagement);
}
finally
{
// Consider reducing the wait time with last lease-walkthrough's time taken.
var elapsedTime = loopStopwatch.Elapsed;
if (leaseManager.LeaseRenewInterval > elapsedTime)
{
await Task.Delay(leaseManager.LeaseRenewInterval.Subtract(elapsedTime), cancellationToken).ConfigureAwait(false);
}
}
}
}