internal async Task TrySyncAsync()

in src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.Subscription.cs [219:404]


        internal async Task<bool> TrySyncAsync(CancellationToken ct = default)
        {
            var session = _session;
            if (session == null)
            {
                return false;
            }
            var sw = Stopwatch.StartNew();
            var removals = 0;
            var additions = 0;
            var updates = 0;

            var existing = session.SubscriptionHandles
                .Where(s => s.Value.IsRoot)
                .ToDictionary(k => k.Value.Template, k => k.Value);

            _logger.LogDebug("{Client}: Perform synchronization of subscriptions (total: {Total})",
                this, session.SubscriptionHandles.Count);
            if (!await EnsureSessionIsReadyForSubscriptionsAsync(session, ct).ConfigureAwait(false))
            {
                return false;
            }
            try
            {
                // Get the max item per subscription as well as max
                var caps = await session.GetServerCapabilitiesAsync(
                    NamespaceFormat.Uri, ct).ConfigureAwait(false);
                var maxMonitoredItems = caps.MaxMonitoredItemsPerSubscription;
                var limits = caps.OperationLimits;
                var delay = TimeSpan.MaxValue;

                //
                // Take the subscription lock here! - we hold it all the way until we
                // have updated all subscription states. The subscriptions will access
                // the client again to obtain the monitored items from the subscribers
                // and we do not want any subscribers to be touched or removed while
                // we process the current registrations. Since the call to get the items
                // is frequent, we do not want to generate a copy every time but let
                // the subscriptions access the items directly.
                //
                await _subscriptionLock.WaitAsync(ct).ConfigureAwait(false);
                try
                {
                    var s2r = _s2r.ToDictionary(kv => kv.Key, kv => kv.Value.ToList());

                    // Close and remove items that have no subscribers
                    await Task.WhenAll(existing.Keys
                        .Except(s2r.Keys)
                        .Select(k => existing[k])
                        .Select(async close =>
                        {
                            try
                            {
                                lock (_cache)
                                {
                                    _cache.Remove(close.Template);
                                }
                                if (_s2r.TryRemove(close.Template, out var r))
                                {
                                    Debug.Assert(r.Count == 0,
                                        $"count of registrations {r.Count} > 0");
                                }

                                // Removes the item from the session and dispose
                                await close.DisposeAsync().ConfigureAwait(false);

                                Interlocked.Increment(ref removals);
                                Debug.Assert(close.IsClosed);
                                Debug.Assert(close.Session == null);
                            }
                            catch (OperationCanceledException) { }
                            catch (Exception ex)
                            {
                                _logger.LogError(ex, "{Client}: Failed to close " +
                                    "subscription {Subscription} in session.",
                                    this, close);
                            }
                        })).ConfigureAwait(false);

                    // Add new subscription for items with subscribers
                    var delays = await Task.WhenAll(s2r.Keys
                        .Except(existing.Keys)
                        .Select(async add =>
                        {
                            try
                            {
                                //
                                // Create a new subscription with the subscription
                                // configuration template that as of yet has no
                                // representation and add it to the session.
                                //
#pragma warning disable CA2000 // Dispose objects before losing scope
                                var subscription = new OpcUaSubscription(this,
                                    add, _subscriptionOptions, _loggerFactory,
                                    new OpcUaClientTagList(_connection, _metrics),
                                    null, _timeProvider);
#pragma warning restore CA2000 // Dispose objects before losing scope

                                // Add the subscription to the session
                                session.AddSubscription(subscription);

                                // Sync the subscription which will get it to go live.
                                var delay = await subscription.SyncAsync(maxMonitoredItems,
                                    caps.OperationLimits, ct).ConfigureAwait(false);
                                Interlocked.Increment(ref additions);
                                Debug.Assert(session == subscription.Session);

                                s2r[add].ForEach(r => r.Dirty = false);
                                return delay;
                            }
                            catch (OperationCanceledException)
                            {
                                return TimeSpan.MaxValue;
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError(ex, "{Client}: Failed to add " +
                                    "subscription {Subscription} in session.",
                                    this, add);
                                return TimeSpan.FromMinutes(1);
                            }
                        })).ConfigureAwait(false);

                    delay = delays.DefaultIfEmpty(TimeSpan.MaxValue).Min();
                    // Update any items where subscriber signalled the item was updated
                    delays = await Task.WhenAll(s2r.Keys.Intersect(existing.Keys)
                        .Where(u => s2r[u].Any(b => b.Dirty))
                        .Select(async update =>
                        {
                            try
                            {
                                var subscription = existing[update];
                                var delay = await subscription.SyncAsync(maxMonitoredItems,
                                    caps.OperationLimits, ct).ConfigureAwait(false);
                                Interlocked.Increment(ref updates);
                                Debug.Assert(session == subscription.Session);
                                s2r[update].ForEach(r => r.Dirty = false);
                                return delay;
                            }
                            catch (OperationCanceledException)
                            {
                                return TimeSpan.MaxValue;
                            }
                            catch (Exception ex)
                            {
                                _logger.LogError(ex, "{Client}: Failed to update " +
                                    "subscription {Subscription} in session.",
                                    this, update);
                                return TimeSpan.FromMinutes(1);
                            }
                        })).ConfigureAwait(false);

                    var delay2 = delays.DefaultIfEmpty(TimeSpan.MaxValue).Min();
                    RescheduleSynchronization(delay < delay2 ? delay : delay2);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "{Client}: Error trying to sync subscriptions.", this);
                    var delay2 = TimeSpan.FromMinutes(1);
                    RescheduleSynchronization(delay < delay2 ? delay : delay2);
                }
                finally
                {
                    _subscriptionLock.Release();
                }

                // Finish
                session.UpdateOperationTimeout(false);
                UpdatePublishRequestCounts();

                if (updates + removals + additions != 0)
                {
                    _logger.LogInformation("{Client}: Removed {Removals}, added {Additions}, " +
                        "and updated {Updates} subscriptions (total: {Total}) took {Duration} ms.",
                        this, removals, additions, updates, session.SubscriptionHandles.Count,
                        sw.ElapsedMilliseconds);
                }
                return true;
            }
            catch (Exception ex)
            {
                _logger.LogError("{Client}: Error trying to sync subscriptions: {Error}",
                    this, ex.Message);
                return false;
            }
        }