in dotnet/src/Azure.Iot.Operations.Connector/TelemetryConnectorWorker.cs [73:240]
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
string candidateName = Guid.NewGuid().ToString();
bool isLeader = false;
// Create MQTT client from credentials provided by the operator
MqttConnectionSettings mqttConnectionSettings = MqttConnectionSettings.FromFileMount();
_logger.LogInformation("Connecting to MQTT broker with hostname {hostname} and port {port}", mqttConnectionSettings.HostName, mqttConnectionSettings.TcpPort);
await _mqttClient.ConnectAsync(mqttConnectionSettings, cancellationToken);
_logger.LogInformation($"Successfully connected to MQTT broker");
bool doingLeaderElection = false;
try
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
TaskCompletionSource aepDeletedOrUpdatedTcs = new();
TaskCompletionSource<AssetEndpointProfile> aepCreatedTcs = new();
_assetMonitor.AssetEndpointProfileChanged += (sender, args) =>
{
// Each connector should have one AEP deployed to the pod. It shouldn't ever be deleted, but it may be updated.
if (args.ChangeType == ChangeType.Created)
{
if (args.AssetEndpointProfile == null)
{
// shouldn't ever happen
_logger.LogError("Received notification that asset endpoint profile was created, but no asset endpoint profile was provided");
}
else
{
aepCreatedTcs.TrySetResult(args.AssetEndpointProfile);
}
}
else
{
aepDeletedOrUpdatedTcs.TrySetResult();
}
};
_assetMonitor.ObserveAssetEndpointProfile(null, cancellationToken);
_logger.LogInformation("Waiting for asset endpoint profile to be discovered");
AssetEndpointProfile = await aepCreatedTcs.Task.WaitAsync(cancellationToken);
_logger.LogInformation("Successfully discovered the asset endpoint profile");
if (_leaderElectionConfiguration != null)
{
doingLeaderElection = true;
string leadershipPositionId = _leaderElectionConfiguration.LeadershipPositionId;
_logger.LogInformation($"Leadership position Id {leadershipPositionId} was configured, so this pod will perform leader election");
await using LeaderElectionClient leaderElectionClient = new(_applicationContext, _mqttClient, leadershipPositionId, candidateName);
leaderElectionClient.AutomaticRenewalOptions = new LeaderElectionAutomaticRenewalOptions()
{
AutomaticRenewal = true,
ElectionTerm = _leaderElectionConfiguration.LeadershipPositionTermLength,
RenewalPeriod = _leaderElectionConfiguration.LeadershipPositionRenewalRate
};
leaderElectionClient.LeadershipChangeEventReceivedAsync += (sender, args) =>
{
isLeader = args.NewLeader != null && args.NewLeader.GetString().Equals(candidateName);
if (isLeader)
{
_logger.LogInformation("Received notification that this pod is the leader");
}
return Task.CompletedTask;
};
_logger.LogInformation("This pod is waiting to be elected leader.");
await leaderElectionClient.CampaignAsync(_leaderElectionConfiguration.LeadershipPositionTermLength);
_logger.LogInformation("This pod was elected leader.");
}
_assetMonitor.AssetChanged += (sender, args) =>
{
_logger.LogInformation($"Received a notification an asset with name {args.AssetName} has been {args.ChangeType.ToString().ToLower()}.");
if (args.ChangeType == ChangeType.Deleted)
{
AssetUnavailable(args.AssetName, false);
}
else if (args.ChangeType == ChangeType.Created)
{
_ = AssetAvailableAsync(AssetEndpointProfile, args.Asset!, args.AssetName, cancellationToken);
}
else
{
// asset changes don't all necessitate re-creating the relevant dataset samplers, but there is no way to know
// at this level what changes are dataset-specific nor which of those changes require a new sampler. Because
// of that, this sample just assumes all asset changes require the factory requesting a new sampler.
AssetUnavailable(args.AssetName, true);
_ = AssetAvailableAsync(AssetEndpointProfile, args.Asset!, args.AssetName, cancellationToken);
}
};
_logger.LogInformation("Now monitoring for asset creation/deletion/updates");
_assetMonitor.ObserveAssets(null, cancellationToken);
// Wait until the worker is cancelled or it is no longer the leader
while (!cancellationToken.IsCancellationRequested && (isLeader || !doingLeaderElection) && !aepDeletedOrUpdatedTcs.Task.IsCompleted)
{
try
{
if (doingLeaderElection)
{
await Task.WhenAny(
aepDeletedOrUpdatedTcs.Task,
Task.Delay(_leaderElectionConfiguration!.LeadershipPositionTermLength)).WaitAsync(cancellationToken);
}
else
{
await Task.WhenAny(
aepDeletedOrUpdatedTcs.Task).WaitAsync(cancellationToken);
}
}
catch (OperationCanceledException)
{
// expected outcome, allow the while loop to check status again
}
}
if (cancellationToken.IsCancellationRequested)
{
_logger.LogInformation("This pod is shutting down. It will now stop monitoring and sampling assets.");
}
else if (aepDeletedOrUpdatedTcs.Task.IsCompleted)
{
_logger.LogInformation("Received a notification that the asset endpoint profile has changed. This pod will now cancel current asset sampling and restart monitoring assets.");
}
else if (doingLeaderElection)
{
_logger.LogInformation("This pod is no longer the leader. It will now stop monitoring and sampling assets.");
}
else
{
// Shouldn't happen. The pod should either be cancelled, the AEP should have changed, or this pod should have lost its position as leader
_logger.LogInformation("This pod will now cancel current asset sampling and restart monitoring assets.");
}
_assetMonitor.UnobserveAssets();
_assetMonitor.UnobserveAssetEndpointProfile();
foreach (string assetName in _assets.Keys)
{
AssetUnavailable(assetName, false);
}
}
catch (Exception ex)
{
_logger.LogError($"Encountered an error: {ex}");
}
}
}
finally
{
_logger.LogInformation("Shutting down the connector");
}
}