in dotnet/src/Azure.Iot.Operations.Connector/TelemetryConnectorWorker.cs [253:322]
private async Task AssetAvailableAsync(AssetEndpointProfile assetEndpointProfile, Asset asset, string assetName, CancellationToken cancellationToken = default)
{
_assets.TryAdd(assetName, asset);
if (asset.DatasetsDictionary == null)
{
_logger.LogInformation($"Asset with name {assetName} has no datasets to sample");
}
else
{
foreach (string datasetName in asset.DatasetsDictionary!.Keys)
{
Dataset dataset = asset.DatasetsDictionary![datasetName];
// This may register a message schema that has already been uploaded, but the schema registry service is idempotent
var datasetMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(assetEndpointProfile, asset, datasetName, dataset);
if (datasetMessageSchema != null)
{
_logger.LogInformation($"Registering message schema for dataset with name {datasetName} on asset with name {assetName}");
await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient);
await schemaRegistryClient.PutAsync(
datasetMessageSchema.SchemaContent,
datasetMessageSchema.SchemaFormat,
datasetMessageSchema.SchemaType,
datasetMessageSchema.Version ?? "1.0.0",
datasetMessageSchema.Tags,
null,
cancellationToken);
}
else
{
_logger.LogInformation($"No message schema will be registered for dataset with name {datasetName} on asset with name {assetName}");
}
}
}
if (asset.EventsDictionary == null)
{
_logger.LogInformation($"Asset with name {assetName} has no events to listen for");
}
else
{
foreach (string eventName in asset.EventsDictionary!.Keys)
{
Event assetEvent = asset.EventsDictionary[eventName];
// This may register a message schema that has already been uploaded, but the schema registry service is idempotent
var eventMessageSchema = await _messageSchemaProviderFactory.GetMessageSchemaAsync(assetEndpointProfile, asset, eventName, assetEvent);
if (eventMessageSchema != null)
{
_logger.LogInformation($"Registering message schema for event with name {eventName} on asset with name {assetName}");
await using SchemaRegistryClient schemaRegistryClient = new(_applicationContext, _mqttClient);
await schemaRegistryClient.PutAsync(
eventMessageSchema.SchemaContent,
eventMessageSchema.SchemaFormat,
eventMessageSchema.SchemaType,
eventMessageSchema.Version ?? "1.0.0",
eventMessageSchema.Tags,
null,
cancellationToken);
}
else
{
_logger.LogInformation($"No message schema will be registered for event with name {eventName} on asset with name {assetName}");
}
}
}
OnAssetAvailable?.Invoke(this, new(assetName, asset, assetEndpointProfile));
}