in src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.Subscription.cs [219:404]
internal async Task<bool> TrySyncAsync(CancellationToken ct = default)
{
var session = _session;
if (session == null)
{
return false;
}
var sw = Stopwatch.StartNew();
var removals = 0;
var additions = 0;
var updates = 0;
var existing = session.SubscriptionHandles
.Where(s => s.Value.IsRoot)
.ToDictionary(k => k.Value.Template, k => k.Value);
_logger.LogDebug("{Client}: Perform synchronization of subscriptions (total: {Total})",
this, session.SubscriptionHandles.Count);
if (!await EnsureSessionIsReadyForSubscriptionsAsync(session, ct).ConfigureAwait(false))
{
return false;
}
try
{
// Get the max item per subscription as well as max
var caps = await session.GetServerCapabilitiesAsync(
NamespaceFormat.Uri, ct).ConfigureAwait(false);
var maxMonitoredItems = caps.MaxMonitoredItemsPerSubscription;
var limits = caps.OperationLimits;
var delay = TimeSpan.MaxValue;
//
// Take the subscription lock here! - we hold it all the way until we
// have updated all subscription states. The subscriptions will access
// the client again to obtain the monitored items from the subscribers
// and we do not want any subscribers to be touched or removed while
// we process the current registrations. Since the call to get the items
// is frequent, we do not want to generate a copy every time but let
// the subscriptions access the items directly.
//
await _subscriptionLock.WaitAsync(ct).ConfigureAwait(false);
try
{
var s2r = _s2r.ToDictionary(kv => kv.Key, kv => kv.Value.ToList());
// Close and remove items that have no subscribers
await Task.WhenAll(existing.Keys
.Except(s2r.Keys)
.Select(k => existing[k])
.Select(async close =>
{
try
{
lock (_cache)
{
_cache.Remove(close.Template);
}
if (_s2r.TryRemove(close.Template, out var r))
{
Debug.Assert(r.Count == 0,
$"count of registrations {r.Count} > 0");
}
// Removes the item from the session and dispose
await close.DisposeAsync().ConfigureAwait(false);
Interlocked.Increment(ref removals);
Debug.Assert(close.IsClosed);
Debug.Assert(close.Session == null);
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Failed to close " +
"subscription {Subscription} in session.",
this, close);
}
})).ConfigureAwait(false);
// Add new subscription for items with subscribers
var delays = await Task.WhenAll(s2r.Keys
.Except(existing.Keys)
.Select(async add =>
{
try
{
//
// Create a new subscription with the subscription
// configuration template that as of yet has no
// representation and add it to the session.
//
#pragma warning disable CA2000 // Dispose objects before losing scope
var subscription = new OpcUaSubscription(this,
add, _subscriptionOptions, _loggerFactory,
new OpcUaClientTagList(_connection, _metrics),
null, _timeProvider);
#pragma warning restore CA2000 // Dispose objects before losing scope
// Add the subscription to the session
session.AddSubscription(subscription);
// Sync the subscription which will get it to go live.
var delay = await subscription.SyncAsync(maxMonitoredItems,
caps.OperationLimits, ct).ConfigureAwait(false);
Interlocked.Increment(ref additions);
Debug.Assert(session == subscription.Session);
s2r[add].ForEach(r => r.Dirty = false);
return delay;
}
catch (OperationCanceledException)
{
return TimeSpan.MaxValue;
}
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Failed to add " +
"subscription {Subscription} in session.",
this, add);
return TimeSpan.FromMinutes(1);
}
})).ConfigureAwait(false);
delay = delays.DefaultIfEmpty(TimeSpan.MaxValue).Min();
// Update any items where subscriber signalled the item was updated
delays = await Task.WhenAll(s2r.Keys.Intersect(existing.Keys)
.Where(u => s2r[u].Any(b => b.Dirty))
.Select(async update =>
{
try
{
var subscription = existing[update];
var delay = await subscription.SyncAsync(maxMonitoredItems,
caps.OperationLimits, ct).ConfigureAwait(false);
Interlocked.Increment(ref updates);
Debug.Assert(session == subscription.Session);
s2r[update].ForEach(r => r.Dirty = false);
return delay;
}
catch (OperationCanceledException)
{
return TimeSpan.MaxValue;
}
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Failed to update " +
"subscription {Subscription} in session.",
this, update);
return TimeSpan.FromMinutes(1);
}
})).ConfigureAwait(false);
var delay2 = delays.DefaultIfEmpty(TimeSpan.MaxValue).Min();
RescheduleSynchronization(delay < delay2 ? delay : delay2);
}
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Error trying to sync subscriptions.", this);
var delay2 = TimeSpan.FromMinutes(1);
RescheduleSynchronization(delay < delay2 ? delay : delay2);
}
finally
{
_subscriptionLock.Release();
}
// Finish
session.UpdateOperationTimeout(false);
UpdatePublishRequestCounts();
if (updates + removals + additions != 0)
{
_logger.LogInformation("{Client}: Removed {Removals}, added {Additions}, " +
"and updated {Updates} subscriptions (total: {Total}) took {Duration} ms.",
this, removals, additions, updates, session.SubscriptionHandles.Count,
sw.ElapsedMilliseconds);
}
return true;
}
catch (Exception ex)
{
_logger.LogError("{Client}: Error trying to sync subscriptions: {Error}",
this, ex.Message);
return false;
}
}