public async Task ProcessDiscoveryEventsAsync()

in src/Azure.IIoT.OpcUa.Publisher.Service/src/Services/ApplicationRegistry.cs [548:803]


        public async Task ProcessDiscoveryEventsAsync(string siteId, string discovererId,
            DiscoveryResultModel result, IReadOnlyList<DiscoveryEventModel> events)
        {
            if (string.IsNullOrEmpty(siteId))
            {
                throw new ArgumentNullException(nameof(siteId));
            }
            if (string.IsNullOrEmpty(discovererId))
            {
                throw new ArgumentNullException(nameof(discovererId));
            }
            ArgumentNullException.ThrowIfNull(result);
            var context = result.Context.Validate(_timeProvider);

            //
            // Get all applications for this discoverer or the site the application
            // was found in.  There should only be one site in the found application set
            // or none, otherwise, throw.  The OR covers where site of a discoverer was
            // changed after a discovery run (same discoverer that registered, but now
            // different site reported).
            //
            var sql = "SELECT * FROM devices WHERE " +
                $"tags.{nameof(EntityRegistration.DeviceType)} = '{Constants.EntityTypeApplication}' AND " +
                $"(tags.{nameof(ApplicationRegistration.SiteId)} = '{siteId}' OR" +
                $" tags.{nameof(ApplicationRegistration.DiscovererId)} = '{discovererId}')";

            var twins = await _iothub.QueryAllDeviceTwinsAsync(sql).ConfigureAwait(false);
            var existing = twins
                .Select(t => t.ToApplicationRegistration()?.ToServiceModel()!)
                .Where(s => s != null)
                .ToList();

            var found = events.Where(ev => ev.Application != null).Select(ev =>
            {
                //
                // Ensure we set the site id and discoverer id in the found applications
                // to a consistent value.  This works around where the reported events
                // do not contain what we were asked to process with.
                //
                ev.Application!.SiteId = siteId;
                ev.Application.DiscovererId = discovererId;
                return ev.Application;
            }).ToList();

            // Create endpoints lookup table per found application id
            var endpoints = events.Where(ev => ev.Application != null)
                .GroupBy(k => k.Application!.ApplicationId).ToDictionary(
                group => group.Key,
                group => group
                    .Where(ev => ev.Registration != null)
                    .Select(ev =>
                    {
                        //
                        // Ensure the site id and discoverer id in the found endpoints
                        // also set to a consistent value, same as applications earlier.
                        //
                        ev.Registration!.SiteId = siteId;
                        ev.Registration.DiscovererId = discovererId;
                        return new EndpointInfoModel
                        {
                            ApplicationId = group.Key,
                            Registration = ev.Registration
                        };
                    })
                    .ToList());

            //
            // Merge found with existing applications. For disabled applications this will
            // take ownership regardless of discoverer, unfound applications are only disabled
            // and existing ones are patched only if they were previously reported by the same
            // discoverer.  New ones are simply added.
            //
            var remove = new HashSet<ApplicationInfoModel>(existing,
                ApplicationInfoModelEx.LogicalEquality);
            var add = new HashSet<ApplicationInfoModel>(found,
                ApplicationInfoModelEx.LogicalEquality);
            var unchange = new HashSet<ApplicationInfoModel>(existing,
                ApplicationInfoModelEx.LogicalEquality);
            var change = new HashSet<ApplicationInfoModel>(found,
                ApplicationInfoModelEx.LogicalEquality);

            unchange.IntersectWith(add);
            change.IntersectWith(remove);
            remove.ExceptWith(found);
            add.ExceptWith(existing);

            var added = 0;
            var updated = 0;
            var unchanged = 0;
            var removed = 0;

            if (!(result.RegisterOnly ?? false))
            {
                // Remove applications
                foreach (var removal in remove)
                {
                    try
                    {
                        // Only touch applications the discoverer owns.
                        if (removal.DiscovererId == discovererId)
                        {
                            var wasUpdated = false;

                            // Disable if not already disabled
                            var app = await UpdateApplicationAsync(removal.ApplicationId,
                                (application, disabled) =>
                                {
                                    // Disable application
                                    if (!(disabled ?? false))
                                    {
                                        application.NotSeenSince = _timeProvider.GetUtcNow();
                                        application.Updated = context;
                                        removed++;
                                        wasUpdated = true;
                                        return (true, true);
                                    }
                                    unchanged++;
                                    return (null, null);
                                }, default).ConfigureAwait(false)
                                    ?? throw new InvalidOperationException("Failed to update application.");
                            if (wasUpdated && _applicationEvents != null)
                            {
                                await _applicationEvents.OnApplicationUpdatedAsync(context,
                                    app).ConfigureAwait(false);
                            }

                            await HandleApplicationDisabledAsync(context, app).ConfigureAwait(false);
                        }
                        else
                        {
                            // Skip the ones owned by other discoverers
                            unchanged++;
                        }
                    }
                    catch (Exception ex)
                    {
                        unchanged++;
                        _logger.LogError(ex, "Exception during application disabling.");
                    }
                }
            }

            // ... add brand new applications
            foreach (var addition in add)
            {
                try
                {
                    var application = addition.Clone(_timeProvider);
                    application.SiteId = siteId;
                    application.ApplicationId = ApplicationInfoModelEx.CreateApplicationId(application);
                    application.Created = context;
                    application.NotSeenSince = null;
                    application.DiscovererId = discovererId;

                    var app = await AddOrUpdateApplicationAsync(application, false,
                        false, default).ConfigureAwait(false)
                        ?? throw new InvalidOperationException("Failed to add or update application.");
                    // Notify addition!
                    if (_applicationEvents != null)
                    {
                        await _applicationEvents.OnApplicationNewAsync(context,
                            app).ConfigureAwait(false);
                    }

                    await HandleApplicationEnabledAsync(context, app).ConfigureAwait(false);

                    // Now - add all new endpoints
                    if (endpoints.TryGetValue(app.ApplicationId, out var epFound))
                    {
                        await AddEndpointsAsync(epFound, result.Context, result.RegisterOnly ?? false,
                            discovererId, null, false).ConfigureAwait(false);
                    }

                    added++;
                }
                catch (ResourceConflictException)
                {
                    unchange.Add(addition); // Update the existing one
                }
                catch (Exception ex)
                {
                    unchanged++;
                    _logger.LogError(ex, "Exception adding application from discovery.");
                }
            }

            // Update applications and endpoints ...
            foreach (var update in unchange)
            {
                try
                {
                    var wasDisabled = false;
                    var wasUpdated = false;

                    // Disable if not already disabled
                    var app = await UpdateApplicationAsync(update.ApplicationId,
                        (application, disabled) =>
                        {
                            //
                            // Check whether another discoverer owns this application (discoverer
                            // id are not the same) and it is not disabled before updating it it.
                            //
                            if (update.DiscovererId != discovererId && !(disabled ?? false))
                            {
                                // TODO: Decide whether we merge newly found endpoints...
                                unchanged++;
                                return (null, null);
                            }

                            wasDisabled = (disabled ?? false) && application.NotSeenSince != null;
                            wasUpdated = true;

                            application.Update(update);
                            application.DiscovererId = discovererId;
                            application.SiteId = siteId;
                            application.NotSeenSince = null;
                            application.Updated = context;
                            updated++;
                            return (true, false);
                        }, default).ConfigureAwait(false)
                        ?? throw new InvalidOperationException("Failed to update application.");
                    if (wasDisabled)
                    {
                        await HandleApplicationEnabledAsync(context, app).ConfigureAwait(false);
                    }

                    // If this is our discoverer's application we update all endpoints also.
                    if (wasUpdated && endpoints.TryGetValue(app.ApplicationId, out var epFound))
                    {
                        // TODO: Handle case where we take ownership of all endpoints
                        await AddEndpointsAsync(epFound, result.Context,
                            result.RegisterOnly ?? false, discovererId,
                            app.ApplicationId, false).ConfigureAwait(false);

                        if (_applicationEvents != null)
                        {
                            await _applicationEvents.OnApplicationUpdatedAsync(context,
                                app).ConfigureAwait(false);
                        }
                    }
                }
                catch (Exception ex)
                {
                    unchanged++;
                    _logger.LogError(ex, "Exception during update.");
                }
            }

            var log = added != 0 || removed != 0 || updated != 0;
            _logger.LogInformation("... processed discovery results from {DiscovererId}: " +
                "{Added} applications added, {Updated} updated, {Removed} disabled, and " +
                "{Unchanged} unchanged.", discovererId, added, updated, removed, unchanged);
            kAppsAdded.Add(added);
            kAppsUpdated.Add(updated);
            kAppsUnchanged.Add(unchanged);
        }