in src/Azure.IIoT.OpcUa.Publisher.Service/src/Services/ApplicationRegistry.cs [548:803]
public async Task ProcessDiscoveryEventsAsync(string siteId, string discovererId,
DiscoveryResultModel result, IReadOnlyList<DiscoveryEventModel> events)
{
if (string.IsNullOrEmpty(siteId))
{
throw new ArgumentNullException(nameof(siteId));
}
if (string.IsNullOrEmpty(discovererId))
{
throw new ArgumentNullException(nameof(discovererId));
}
ArgumentNullException.ThrowIfNull(result);
var context = result.Context.Validate(_timeProvider);
//
// Get all applications for this discoverer or the site the application
// was found in. There should only be one site in the found application set
// or none, otherwise, throw. The OR covers where site of a discoverer was
// changed after a discovery run (same discoverer that registered, but now
// different site reported).
//
var sql = "SELECT * FROM devices WHERE " +
$"tags.{nameof(EntityRegistration.DeviceType)} = '{Constants.EntityTypeApplication}' AND " +
$"(tags.{nameof(ApplicationRegistration.SiteId)} = '{siteId}' OR" +
$" tags.{nameof(ApplicationRegistration.DiscovererId)} = '{discovererId}')";
var twins = await _iothub.QueryAllDeviceTwinsAsync(sql).ConfigureAwait(false);
var existing = twins
.Select(t => t.ToApplicationRegistration()?.ToServiceModel()!)
.Where(s => s != null)
.ToList();
var found = events.Where(ev => ev.Application != null).Select(ev =>
{
//
// Ensure we set the site id and discoverer id in the found applications
// to a consistent value. This works around where the reported events
// do not contain what we were asked to process with.
//
ev.Application!.SiteId = siteId;
ev.Application.DiscovererId = discovererId;
return ev.Application;
}).ToList();
// Create endpoints lookup table per found application id
var endpoints = events.Where(ev => ev.Application != null)
.GroupBy(k => k.Application!.ApplicationId).ToDictionary(
group => group.Key,
group => group
.Where(ev => ev.Registration != null)
.Select(ev =>
{
//
// Ensure the site id and discoverer id in the found endpoints
// also set to a consistent value, same as applications earlier.
//
ev.Registration!.SiteId = siteId;
ev.Registration.DiscovererId = discovererId;
return new EndpointInfoModel
{
ApplicationId = group.Key,
Registration = ev.Registration
};
})
.ToList());
//
// Merge found with existing applications. For disabled applications this will
// take ownership regardless of discoverer, unfound applications are only disabled
// and existing ones are patched only if they were previously reported by the same
// discoverer. New ones are simply added.
//
var remove = new HashSet<ApplicationInfoModel>(existing,
ApplicationInfoModelEx.LogicalEquality);
var add = new HashSet<ApplicationInfoModel>(found,
ApplicationInfoModelEx.LogicalEquality);
var unchange = new HashSet<ApplicationInfoModel>(existing,
ApplicationInfoModelEx.LogicalEquality);
var change = new HashSet<ApplicationInfoModel>(found,
ApplicationInfoModelEx.LogicalEquality);
unchange.IntersectWith(add);
change.IntersectWith(remove);
remove.ExceptWith(found);
add.ExceptWith(existing);
var added = 0;
var updated = 0;
var unchanged = 0;
var removed = 0;
if (!(result.RegisterOnly ?? false))
{
// Remove applications
foreach (var removal in remove)
{
try
{
// Only touch applications the discoverer owns.
if (removal.DiscovererId == discovererId)
{
var wasUpdated = false;
// Disable if not already disabled
var app = await UpdateApplicationAsync(removal.ApplicationId,
(application, disabled) =>
{
// Disable application
if (!(disabled ?? false))
{
application.NotSeenSince = _timeProvider.GetUtcNow();
application.Updated = context;
removed++;
wasUpdated = true;
return (true, true);
}
unchanged++;
return (null, null);
}, default).ConfigureAwait(false)
?? throw new InvalidOperationException("Failed to update application.");
if (wasUpdated && _applicationEvents != null)
{
await _applicationEvents.OnApplicationUpdatedAsync(context,
app).ConfigureAwait(false);
}
await HandleApplicationDisabledAsync(context, app).ConfigureAwait(false);
}
else
{
// Skip the ones owned by other discoverers
unchanged++;
}
}
catch (Exception ex)
{
unchanged++;
_logger.LogError(ex, "Exception during application disabling.");
}
}
}
// ... add brand new applications
foreach (var addition in add)
{
try
{
var application = addition.Clone(_timeProvider);
application.SiteId = siteId;
application.ApplicationId = ApplicationInfoModelEx.CreateApplicationId(application);
application.Created = context;
application.NotSeenSince = null;
application.DiscovererId = discovererId;
var app = await AddOrUpdateApplicationAsync(application, false,
false, default).ConfigureAwait(false)
?? throw new InvalidOperationException("Failed to add or update application.");
// Notify addition!
if (_applicationEvents != null)
{
await _applicationEvents.OnApplicationNewAsync(context,
app).ConfigureAwait(false);
}
await HandleApplicationEnabledAsync(context, app).ConfigureAwait(false);
// Now - add all new endpoints
if (endpoints.TryGetValue(app.ApplicationId, out var epFound))
{
await AddEndpointsAsync(epFound, result.Context, result.RegisterOnly ?? false,
discovererId, null, false).ConfigureAwait(false);
}
added++;
}
catch (ResourceConflictException)
{
unchange.Add(addition); // Update the existing one
}
catch (Exception ex)
{
unchanged++;
_logger.LogError(ex, "Exception adding application from discovery.");
}
}
// Update applications and endpoints ...
foreach (var update in unchange)
{
try
{
var wasDisabled = false;
var wasUpdated = false;
// Disable if not already disabled
var app = await UpdateApplicationAsync(update.ApplicationId,
(application, disabled) =>
{
//
// Check whether another discoverer owns this application (discoverer
// id are not the same) and it is not disabled before updating it it.
//
if (update.DiscovererId != discovererId && !(disabled ?? false))
{
// TODO: Decide whether we merge newly found endpoints...
unchanged++;
return (null, null);
}
wasDisabled = (disabled ?? false) && application.NotSeenSince != null;
wasUpdated = true;
application.Update(update);
application.DiscovererId = discovererId;
application.SiteId = siteId;
application.NotSeenSince = null;
application.Updated = context;
updated++;
return (true, false);
}, default).ConfigureAwait(false)
?? throw new InvalidOperationException("Failed to update application.");
if (wasDisabled)
{
await HandleApplicationEnabledAsync(context, app).ConfigureAwait(false);
}
// If this is our discoverer's application we update all endpoints also.
if (wasUpdated && endpoints.TryGetValue(app.ApplicationId, out var epFound))
{
// TODO: Handle case where we take ownership of all endpoints
await AddEndpointsAsync(epFound, result.Context,
result.RegisterOnly ?? false, discovererId,
app.ApplicationId, false).ConfigureAwait(false);
if (_applicationEvents != null)
{
await _applicationEvents.OnApplicationUpdatedAsync(context,
app).ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
unchanged++;
_logger.LogError(ex, "Exception during update.");
}
}
var log = added != 0 || removed != 0 || updated != 0;
_logger.LogInformation("... processed discovery results from {DiscovererId}: " +
"{Added} applications added, {Updated} updated, {Removed} disabled, and " +
"{Unchanged} unchanged.", discovererId, added, updated, removed, unchanged);
kAppsAdded.Add(added);
kAppsUpdated.Add(updated);
kAppsUnchanged.Add(unchanged);
}