private async Task AssetAvailableAsync()

in dotnet/src/Azure.Iot.Operations.Connector/TelemetryConnectorWorker.cs [253:322]


        private async Task AssetAvailableAsync(AssetEndpointProfile assetEndpointProfile, Asset asset, string assetName, CancellationToken cancellationToken = default)
        {
            _assets.TryAdd(assetName, asset);

            if (asset.DatasetsDictionary == null)
            {
                _logger.LogInformation($"Asset with name {assetName} has no datasets to sample");
            }
            else
            {
                foreach (string datasetName in asset.DatasetsDictionary!.Keys)
                {
                    Dataset dataset = asset.DatasetsDictionary![datasetName];

                    // This may register a message schema that has already been uploaded, but the schema registry service is idempotent
                    var datasetMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(assetEndpointProfile, asset, datasetName, dataset);
                    if (datasetMessageSchema != null)
                    {
                        _logger.LogInformation($"Registering message schema for dataset with name {datasetName} on asset with name {assetName}");
                        await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient);
                        await schemaRegistryClient.PutAsync(
                            datasetMessageSchema.SchemaContent,
                            datasetMessageSchema.SchemaFormat,
                            datasetMessageSchema.SchemaType,
                            datasetMessageSchema.Version ?? "1.0.0",
                            datasetMessageSchema.Tags,
                            null,
                            cancellationToken);
                    }
                    else
                    {
                        _logger.LogInformation($"No message schema will be registered for dataset with name {datasetName} on asset with name {assetName}");
                    }
                }
            }

            if (asset.EventsDictionary == null)
            {
                _logger.LogInformation($"Asset with name {assetName} has no events to listen for");
            }
            else
            {
                foreach (string eventName in asset.EventsDictionary!.Keys)
                {
                    Event assetEvent = asset.EventsDictionary[eventName];

                    // This may register a message schema that has already been uploaded, but the schema registry service is idempotent
                    var eventMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(assetEndpointProfile, asset, eventName, assetEvent);
                    if (eventMessageSchema != null)
                    {
                        _logger.LogInformation($"Registering message schema for event with name {eventName} on asset with name {assetName}");
                        await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient);
                        await schemaRegistryClient.PutAsync(
                            eventMessageSchema.SchemaContent,
                            eventMessageSchema.SchemaFormat,
                            eventMessageSchema.SchemaType,
                            eventMessageSchema.Version ?? "1.0.0",
                            eventMessageSchema.Tags,
                            null,
                            cancellationToken);
                    }
                    else
                    {
                        _logger.LogInformation($"No message schema will be registered for event with name {eventName} on asset with name {assetName}");
                    }
                }
            }

            OnAssetAvailable?.Invoke(this, new(assetName, asset, assetEndpointProfile));
        }