dotnet/src/Azure.Iot.Operations.Connector/TelemetryConnectorWorker.cs (303 lines of code) (raw):

// Copyright (c) Microsoft Corporation. // Licensed under the MIT License. using Azure.Iot.Operations.Connector.Exceptions; using Azure.Iot.Operations.Protocol; using Azure.Iot.Operations.Protocol.Connection; using Azure.Iot.Operations.Protocol.Models; using Azure.Iot.Operations.Services.Assets; using Azure.Iot.Operations.Services.LeaderElection; using Azure.Iot.Operations.Services.SchemaRegistry; using Microsoft.Extensions.Logging; using System.Collections.Concurrent; using System.Text; using System.Text.Json; namespace Azure.Iot.Operations.Connector { /// <summary> /// Base class for a connector worker that allows users to forward data samplied from datasets and forwarding of received events. /// </summary> public class TelemetryConnectorWorker : ConnectorBackgroundService { protected readonly ILogger<TelemetryConnectorWorker> _logger; private readonly IMqttClient _mqttClient; private readonly ApplicationContext _applicationContext; private readonly IAssetMonitor _assetMonitor; private readonly IMessageSchemaProvider _messageSchemaProviderFactory; private readonly ConcurrentDictionary<string, Asset> _assets = new(); private bool _isDisposed = false; /// <summary> /// Event handler for when an asset becomes available. /// </summary> public EventHandler<AssetAvailabileEventArgs>? OnAssetAvailable; /// <summary> /// Event handler for when an asset becomes unavailable. /// </summary> public EventHandler<AssetUnavailableEventArgs>? OnAssetUnavailable; /// <summary> /// The asset endpoint profile associated with this connector. This will be null until the asset endpoint profile is first discovered. /// </summary> public AssetEndpointProfile? AssetEndpointProfile { get; set; } private readonly ConnectorLeaderElectionConfiguration? _leaderElectionConfiguration; public TelemetryConnectorWorker( ApplicationContext applicationContext, ILogger<TelemetryConnectorWorker> logger, IMqttClient mqttClient, IMessageSchemaProvider messageSchemaProviderFactory, IAssetMonitor assetMonitor, IConnectorLeaderElectionConfigurationProvider? leaderElectionConfigurationProvider = null) { _applicationContext = applicationContext; _logger = logger; _mqttClient = mqttClient; _messageSchemaProviderFactory = messageSchemaProviderFactory; _assetMonitor = assetMonitor; _leaderElectionConfiguration = leaderElectionConfigurationProvider?.GetLeaderElectionConfiguration(); } ///<inheritdoc/> public override Task RunConnectorAsync(CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); // This method is public to allow users to access the BackgroundService interface's ExecuteAsync method. return ExecuteAsync(cancellationToken); } protected override async Task ExecuteAsync(CancellationToken cancellationToken) { string candidateName = Guid.NewGuid().ToString(); bool isLeader = false; // Create MQTT client from credentials provided by the operator MqttConnectionSettings mqttConnectionSettings = MqttConnectionSettings.FromFileMount(); _logger.LogInformation("Connecting to MQTT broker with hostname {hostname} and port {port}", mqttConnectionSettings.HostName, mqttConnectionSettings.TcpPort); await _mqttClient.ConnectAsync(mqttConnectionSettings, cancellationToken); _logger.LogInformation($"Successfully connected to MQTT broker"); bool doingLeaderElection = false; try { while (!cancellationToken.IsCancellationRequested) { try { TaskCompletionSource aepDeletedOrUpdatedTcs = new(); TaskCompletionSource<AssetEndpointProfile> aepCreatedTcs = new(); _assetMonitor.AssetEndpointProfileChanged += (sender, args) => { // Each connector should have one AEP deployed to the pod. It shouldn't ever be deleted, but it may be updated. if (args.ChangeType == ChangeType.Created) { if (args.AssetEndpointProfile == null) { // shouldn't ever happen _logger.LogError("Received notification that asset endpoint profile was created, but no asset endpoint profile was provided"); } else { aepCreatedTcs.TrySetResult(args.AssetEndpointProfile); } } else { aepDeletedOrUpdatedTcs.TrySetResult(); } }; _assetMonitor.ObserveAssetEndpointProfile(null, cancellationToken); _logger.LogInformation("Waiting for asset endpoint profile to be discovered"); AssetEndpointProfile = await aepCreatedTcs.Task.WaitAsync(cancellationToken); _logger.LogInformation("Successfully discovered the asset endpoint profile"); if (_leaderElectionConfiguration != null) { doingLeaderElection = true; string leadershipPositionId = _leaderElectionConfiguration.LeadershipPositionId; _logger.LogInformation($"Leadership position Id {leadershipPositionId} was configured, so this pod will perform leader election"); await using LeaderElectionClient leaderElectionClient = new(_applicationContext, _mqttClient, leadershipPositionId, candidateName); leaderElectionClient.AutomaticRenewalOptions = new LeaderElectionAutomaticRenewalOptions() { AutomaticRenewal = true, ElectionTerm = _leaderElectionConfiguration.LeadershipPositionTermLength, RenewalPeriod = _leaderElectionConfiguration.LeadershipPositionRenewalRate }; leaderElectionClient.LeadershipChangeEventReceivedAsync += (sender, args) => { isLeader = args.NewLeader != null && args.NewLeader.GetString().Equals(candidateName); if (isLeader) { _logger.LogInformation("Received notification that this pod is the leader"); } return Task.CompletedTask; }; _logger.LogInformation("This pod is waiting to be elected leader."); await leaderElectionClient.CampaignAsync(_leaderElectionConfiguration.LeadershipPositionTermLength); _logger.LogInformation("This pod was elected leader."); } _assetMonitor.AssetChanged += (sender, args) => { _logger.LogInformation($"Received a notification an asset with name {args.AssetName} has been {args.ChangeType.ToString().ToLower()}."); if (args.ChangeType == ChangeType.Deleted) { AssetUnavailable(args.AssetName, false); } else if (args.ChangeType == ChangeType.Created) { _ = AssetAvailableAsync(AssetEndpointProfile, args.Asset!, args.AssetName, cancellationToken); } else { // asset changes don't all necessitate re-creating the relevant dataset samplers, but there is no way to know // at this level what changes are dataset-specific nor which of those changes require a new sampler. Because // of that, this sample just assumes all asset changes require the factory requesting a new sampler. AssetUnavailable(args.AssetName, true); _ = AssetAvailableAsync(AssetEndpointProfile, args.Asset!, args.AssetName, cancellationToken); } }; _logger.LogInformation("Now monitoring for asset creation/deletion/updates"); _assetMonitor.ObserveAssets(null, cancellationToken); // Wait until the worker is cancelled or it is no longer the leader while (!cancellationToken.IsCancellationRequested && (isLeader || !doingLeaderElection) && !aepDeletedOrUpdatedTcs.Task.IsCompleted) { try { if (doingLeaderElection) { await Task.WhenAny( aepDeletedOrUpdatedTcs.Task, Task.Delay(_leaderElectionConfiguration!.LeadershipPositionTermLength)).WaitAsync(cancellationToken); } else { await Task.WhenAny( aepDeletedOrUpdatedTcs.Task).WaitAsync(cancellationToken); } } catch (OperationCanceledException) { // expected outcome, allow the while loop to check status again } } if (cancellationToken.IsCancellationRequested) { _logger.LogInformation("This pod is shutting down. It will now stop monitoring and sampling assets."); } else if (aepDeletedOrUpdatedTcs.Task.IsCompleted) { _logger.LogInformation("Received a notification that the asset endpoint profile has changed. This pod will now cancel current asset sampling and restart monitoring assets."); } else if (doingLeaderElection) { _logger.LogInformation("This pod is no longer the leader. It will now stop monitoring and sampling assets."); } else { // Shouldn't happen. The pod should either be cancelled, the AEP should have changed, or this pod should have lost its position as leader _logger.LogInformation("This pod will now cancel current asset sampling and restart monitoring assets."); } _assetMonitor.UnobserveAssets(); _assetMonitor.UnobserveAssetEndpointProfile(); foreach (string assetName in _assets.Keys) { AssetUnavailable(assetName, false); } } catch (Exception ex) { _logger.LogError($"Encountered an error: {ex}"); } } } finally { _logger.LogInformation("Shutting down the connector"); } } private void AssetUnavailable(string assetName, bool isRestarting) { _assets.Remove(assetName, out Asset? _); // This method may be called either when an asset was updated or when it was deleted. If it was updated, then it will still be sampleable. if (!isRestarting) { OnAssetUnavailable?.Invoke(this, new(assetName)); } } private async Task AssetAvailableAsync(AssetEndpointProfile assetEndpointProfile, Asset asset, string assetName, CancellationToken cancellationToken = default) { _assets.TryAdd(assetName, asset); if (asset.DatasetsDictionary == null) { _logger.LogInformation($"Asset with name {assetName} has no datasets to sample"); } else { foreach (string datasetName in asset.DatasetsDictionary!.Keys) { Dataset dataset = asset.DatasetsDictionary![datasetName]; // This may register a message schema that has already been uploaded, but the schema registry service is idempotent var datasetMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(assetEndpointProfile, asset, datasetName, dataset); if (datasetMessageSchema != null) { _logger.LogInformation($"Registering message schema for dataset with name {datasetName} on asset with name {assetName}"); await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); await schemaRegistryClient.PutAsync( datasetMessageSchema.SchemaContent, datasetMessageSchema.SchemaFormat, datasetMessageSchema.SchemaType, datasetMessageSchema.Version ?? "1.0.0", datasetMessageSchema.Tags, null, cancellationToken); } else { _logger.LogInformation($"No message schema will be registered for dataset with name {datasetName} on asset with name {assetName}"); } } } if (asset.EventsDictionary == null) { _logger.LogInformation($"Asset with name {assetName} has no events to listen for"); } else { foreach (string eventName in asset.EventsDictionary!.Keys) { Event assetEvent = asset.EventsDictionary[eventName]; // This may register a message schema that has already been uploaded, but the schema registry service is idempotent var eventMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(assetEndpointProfile, asset, eventName, assetEvent); if (eventMessageSchema != null) { _logger.LogInformation($"Registering message schema for event with name {eventName} on asset with name {assetName}"); await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient); await schemaRegistryClient.PutAsync( eventMessageSchema.SchemaContent, eventMessageSchema.SchemaFormat, eventMessageSchema.SchemaType, eventMessageSchema.Version ?? "1.0.0", eventMessageSchema.Tags, null, cancellationToken); } else { _logger.LogInformation($"No message schema will be registered for event with name {eventName} on asset with name {assetName}"); } } } OnAssetAvailable?.Invoke(this, new(assetName, asset, assetEndpointProfile)); } public async Task ForwardSampledDatasetAsync(Asset asset, Dataset dataset, byte[] serializedPayload, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); _logger.LogInformation($"Received sampled payload from dataset with name {dataset.Name} in asset with name {asset.DisplayName}. Now publishing it to MQTT broker: {Encoding.UTF8.GetString(serializedPayload)}"); Topic topic = dataset.Topic ?? asset.DefaultTopic ?? throw new AssetConfigurationException($"Dataset with name {dataset.Name} in asset with name {asset.DisplayName} has no configured MQTT topic to publish to. Data won't be forwarded for this dataset."); var mqttMessage = new MqttApplicationMessage(topic.Path) { PayloadSegment = serializedPayload, Retain = topic.Retain == RetainHandling.Keep, }; MqttClientPublishResult puback = await _mqttClient.PublishAsync(mqttMessage, cancellationToken); if (puback.ReasonCode == MqttClientPublishReasonCode.Success || puback.ReasonCode == MqttClientPublishReasonCode.NoMatchingSubscribers) { // NoMatchingSubscribers case is still successful in the sense that the PUBLISH packet was delivered to the broker successfully. // It does suggest that the broker has no one to send that PUBLISH packet to, though. _logger.LogInformation($"Message was accepted by the MQTT broker with PUBACK reason code: {puback.ReasonCode} and reason {puback.ReasonString}"); } else { _logger.LogInformation($"Received unsuccessful PUBACK from MQTT broker: {puback.ReasonCode} with reason {puback.ReasonString}"); } } public async Task ForwardReceivedEventAsync(Asset asset, Event assetEvent, byte[] serializedPayload, CancellationToken cancellationToken = default) { ObjectDisposedException.ThrowIf(_isDisposed, this); _logger.LogInformation($"Received event with name {assetEvent.Name} in asset with name {asset.DisplayName}. Now publishing it to MQTT broker."); Topic topic = assetEvent.Topic ?? asset.DefaultTopic ?? throw new AssetConfigurationException($"Event with name {assetEvent.Name} in asset with name {asset.DisplayName} has no configured MQTT topic to publish to. Data won't be forwarded for this event."); var mqttMessage = new MqttApplicationMessage(topic.Path) { PayloadSegment = serializedPayload, Retain = topic.Retain == RetainHandling.Keep, }; MqttClientPublishResult puback = await _mqttClient.PublishAsync(mqttMessage, cancellationToken); if (puback.ReasonCode == MqttClientPublishReasonCode.Success || puback.ReasonCode == MqttClientPublishReasonCode.NoMatchingSubscribers) { // NoMatchingSubscribers case is still successful in the sense that the PUBLISH packet was delivered to the broker successfully. // It does suggest that the broker has no one to send that PUBLISH packet to, though. _logger.LogInformation($"Message was accepted by the MQTT broker with PUBACK reason code: {puback.ReasonCode} and reason {puback.ReasonString}"); } else { _logger.LogInformation($"Received unsuccessful PUBACK from MQTT broker: {puback.ReasonCode} with reason {puback.ReasonString}"); } } public override void Dispose() { base.Dispose(); _isDisposed = true; } } }