src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.Subscription.cs (444 lines of code) (raw):
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
// ------------------------------------------------------------
namespace Azure.IIoT.OpcUa.Publisher.Stack.Services
{
using Azure.IIoT.OpcUa.Publisher.Stack.Models;
using Azure.IIoT.OpcUa.Publisher.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Opc.Ua;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua.Client;
internal sealed partial class OpcUaClient
{
/// <summary>
/// Register a new subscriber for a subscription defined by the
/// subscription template.
/// </summary>
/// <param name="subscription"></param>
/// <param name="subscriber"></param>
/// <param name="ct"></param>
internal async ValueTask<ISubscription> RegisterAsync(
SubscriptionModel subscription, ISubscriber subscriber,
CancellationToken ct = default)
{
// Take a reference to the client for the lifetime of the subscription
AddRef();
try
{
await _subscriptionLock.WaitAsync(ct).ConfigureAwait(false);
AddRef();
try
{
OpcUaSubscription? existingSub = null;
//
// If subscriber is registered with a different subscription we either
// update the subscription or dispose the old one and create a new one.
//
if (_registrations.TryGetValue(subscriber, out var existing) &&
existing.Subscription != subscription)
{
existing.RemoveAndReleaseNoLockInternal();
Debug.Assert(!_registrations.ContainsKey(subscriber));
//
// We check if there are any other subscribers registered with the
// same subscription configuration that we want to apply. If there
// are not - we update the subscription (safely) with the new
// desired template configuration. Essentially original behavior
// before 2.9.12.
//
if ((!_s2r.TryGetValue(subscription, out var c) || c.Count == 0) &&
TryGetSubscription(existing.Subscription, out existingSub))
{
existingSub.Update(subscription);
}
}
var registration = new Registration(this, subscription, subscriber);
TriggerSubscriptionSynchronization(existingSub);
return registration;
}
finally
{
Release();
_subscriptionLock.Release();
}
}
finally
{
Release();
}
}
/// <summary>
/// Get subscribers for a subscription template to get at the monitored
/// items that should be created in the subscription or subscriptions.
/// Called under the subscription lock as a result of synchronization.
/// </summary>
/// <param name="template"></param>
/// <returns></returns>
internal IEnumerable<ISubscriber> GetSubscribers(SubscriptionModel template)
{
Debug.Assert(_subscriptionLock.CurrentCount == 0, "Must be locked");
if (_s2r.TryGetValue(template, out var registrations))
{
return registrations.Select(r => r.Owner);
}
return [];
}
/// <summary>
/// Trigger the client to manage the subscription. This is a
/// no op if the subscription is not registered or the client
/// is not connected.
/// </summary>
/// <param name="subscription"></param>
internal void TriggerSubscriptionSynchronization(
OpcUaSubscription? subscription = null)
{
if (subscription?.IsClosed == false)
{
TriggerConnectionEvent(ConnectionEvent.SubscriptionSyncOne,
subscription);
}
else
{
TriggerConnectionEvent(ConnectionEvent.SubscriptionSyncAll);
}
}
/// <summary>
/// Called by subscription when newly created. This needs to be done
/// here this way because the stack uses clone to clone the subscriptions
/// just like it does with sessions and monitored items. This way we can
/// hock the create and clone operations.
/// </summary>
/// <param name="subscription"></param>
internal void OnSubscriptionCreated(OpcUaSubscription subscription)
{
lock (_cache)
{
if (subscription.IsRoot)
{
_cache.AddOrUpdate(subscription.Template, subscription);
}
}
}
/// <summary>
/// Try get subscription with subscription model
/// </summary>
/// <param name="template"></param>
/// <param name="subscription"></param>
/// <returns></returns>
private bool TryGetSubscription(SubscriptionModel template,
[NotNullWhen(true)] out OpcUaSubscription? subscription)
{
// Fast lookup
lock (_cache)
{
if (_cache.TryGetValue(template, out subscription) &&
!subscription.IsClosed &&
subscription.IsRoot)
{
return true;
}
subscription = _session?.SubscriptionHandles.Values
.FirstOrDefault(s => s.IsRoot && s.Template == template);
if (subscription != null)
{
_cache.AddOrUpdate(template, subscription);
return true;
}
return false;
}
}
/// <summary>
/// Access to the subscription to sync state must go through the
/// subscription lock. This just wraps the sync call on the
/// subscription.
/// </summary>
/// <param name="subscription"></param>
/// <param name="ct"></param>
/// <returns></returns>
internal async Task SyncAsync(OpcUaSubscription subscription,
CancellationToken ct = default)
{
var session = _session;
if (session == null)
{
return;
}
await _subscriptionLock.WaitAsync(ct).ConfigureAwait(false);
try
{
// Get the max item per subscription as well as max
var caps = await session.GetServerCapabilitiesAsync(
NamespaceFormat.Uri, ct).ConfigureAwait(false);
var delay = await subscription.SyncAsync(caps.MaxMonitoredItemsPerSubscription,
caps.OperationLimits, ct).ConfigureAwait(false);
RescheduleSynchronization(delay);
}
catch (Exception ex)
{
_logger.LogError(ex,
"{Client}: Error trying to sync subscription {Subscription}",
this, subscription);
RescheduleSynchronization(TimeSpan.FromMinutes(1));
}
finally
{
_subscriptionLock.Release();
}
}
/// <summary>
/// Called by the management thread to synchronize the subscriptions
/// within a session as a result of the trigger call or when a session
/// is reconnected/recreated.
/// </summary>
/// <param name="ct"></param>
/// <returns></returns>
internal async Task<bool> TrySyncAsync(CancellationToken ct = default)
{
var session = _session;
if (session == null)
{
return false;
}
var sw = Stopwatch.StartNew();
var removals = 0;
var additions = 0;
var updates = 0;
var existing = session.SubscriptionHandles
.Where(s => s.Value.IsRoot)
.ToDictionary(k => k.Value.Template, k => k.Value);
_logger.LogDebug("{Client}: Perform synchronization of subscriptions (total: {Total})",
this, session.SubscriptionHandles.Count);
if (!await EnsureSessionIsReadyForSubscriptionsAsync(session, ct).ConfigureAwait(false))
{
return false;
}
try
{
// Get the max item per subscription as well as max
var caps = await session.GetServerCapabilitiesAsync(
NamespaceFormat.Uri, ct).ConfigureAwait(false);
var maxMonitoredItems = caps.MaxMonitoredItemsPerSubscription;
var limits = caps.OperationLimits;
var delay = TimeSpan.MaxValue;
//
// Take the subscription lock here! - we hold it all the way until we
// have updated all subscription states. The subscriptions will access
// the client again to obtain the monitored items from the subscribers
// and we do not want any subscribers to be touched or removed while
// we process the current registrations. Since the call to get the items
// is frequent, we do not want to generate a copy every time but let
// the subscriptions access the items directly.
//
await _subscriptionLock.WaitAsync(ct).ConfigureAwait(false);
try
{
var s2r = _s2r.ToDictionary(kv => kv.Key, kv => kv.Value.ToList());
// Close and remove items that have no subscribers
await Task.WhenAll(existing.Keys
.Except(s2r.Keys)
.Select(k => existing[k])
.Select(async close =>
{
try
{
lock (_cache)
{
_cache.Remove(close.Template);
}
if (_s2r.TryRemove(close.Template, out var r))
{
Debug.Assert(r.Count == 0,
$"count of registrations {r.Count} > 0");
}
// Removes the item from the session and dispose
await close.DisposeAsync().ConfigureAwait(false);
Interlocked.Increment(ref removals);
Debug.Assert(close.IsClosed);
Debug.Assert(close.Session == null);
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Failed to close " +
"subscription {Subscription} in session.",
this, close);
}
})).ConfigureAwait(false);
// Add new subscription for items with subscribers
var delays = await Task.WhenAll(s2r.Keys
.Except(existing.Keys)
.Select(async add =>
{
try
{
//
// Create a new subscription with the subscription
// configuration template that as of yet has no
// representation and add it to the session.
//
#pragma warning disable CA2000 // Dispose objects before losing scope
var subscription = new OpcUaSubscription(this,
add, _subscriptionOptions, _loggerFactory,
new OpcUaClientTagList(_connection, _metrics),
null, _timeProvider);
#pragma warning restore CA2000 // Dispose objects before losing scope
// Add the subscription to the session
session.AddSubscription(subscription);
// Sync the subscription which will get it to go live.
var delay = await subscription.SyncAsync(maxMonitoredItems,
caps.OperationLimits, ct).ConfigureAwait(false);
Interlocked.Increment(ref additions);
Debug.Assert(session == subscription.Session);
s2r[add].ForEach(r => r.Dirty = false);
return delay;
}
catch (OperationCanceledException)
{
return TimeSpan.MaxValue;
}
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Failed to add " +
"subscription {Subscription} in session.",
this, add);
return TimeSpan.FromMinutes(1);
}
})).ConfigureAwait(false);
delay = delays.DefaultIfEmpty(TimeSpan.MaxValue).Min();
// Update any items where subscriber signalled the item was updated
delays = await Task.WhenAll(s2r.Keys.Intersect(existing.Keys)
.Where(u => s2r[u].Any(b => b.Dirty))
.Select(async update =>
{
try
{
var subscription = existing[update];
var delay = await subscription.SyncAsync(maxMonitoredItems,
caps.OperationLimits, ct).ConfigureAwait(false);
Interlocked.Increment(ref updates);
Debug.Assert(session == subscription.Session);
s2r[update].ForEach(r => r.Dirty = false);
return delay;
}
catch (OperationCanceledException)
{
return TimeSpan.MaxValue;
}
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Failed to update " +
"subscription {Subscription} in session.",
this, update);
return TimeSpan.FromMinutes(1);
}
})).ConfigureAwait(false);
var delay2 = delays.DefaultIfEmpty(TimeSpan.MaxValue).Min();
RescheduleSynchronization(delay < delay2 ? delay : delay2);
}
catch (Exception ex)
{
_logger.LogError(ex, "{Client}: Error trying to sync subscriptions.", this);
var delay2 = TimeSpan.FromMinutes(1);
RescheduleSynchronization(delay < delay2 ? delay : delay2);
}
finally
{
_subscriptionLock.Release();
}
// Finish
session.UpdateOperationTimeout(false);
UpdatePublishRequestCounts();
if (updates + removals + additions != 0)
{
_logger.LogInformation("{Client}: Removed {Removals}, added {Additions}, " +
"and updated {Updates} subscriptions (total: {Total}) took {Duration} ms.",
this, removals, additions, updates, session.SubscriptionHandles.Count,
sw.ElapsedMilliseconds);
}
return true;
}
catch (Exception ex)
{
_logger.LogError("{Client}: Error trying to sync subscriptions: {Error}",
this, ex.Message);
return false;
}
}
/// <summary>
/// Check session is ready for subscriptions, which means we fetch the
/// namespace table and type system needed for the encoders and metadata.
/// </summary>
/// <param name="session"></param>
/// <param name="ct"></param>
/// <returns></returns>
private async Task<bool> EnsureSessionIsReadyForSubscriptionsAsync(OpcUaSession session,
CancellationToken ct)
{
try
{
// Reload namespace tables should they have changed...
var oldTable = session.NamespaceUris.ToArray();
await session.FetchNamespaceTablesAsync(ct).ConfigureAwait(false);
var newTable = session.NamespaceUris.ToArray();
LogNamespaceTableChanges(oldTable, newTable);
}
catch (ServiceResultException sre) // anything else is not expected
{
_logger.LogWarning(sre, "{Client}: Failed to fetch namespace table...", this);
return false;
}
if (!DisableComplexTypeLoading && !session.IsTypeSystemLoaded)
{
// Ensure type system is loaded
await session.GetComplexTypeSystemAsync(ct).ConfigureAwait(false);
}
return true;
}
/// <summary>
/// Called under lock, schedule resynchronization of all subscriptions
/// after the specified delay
/// </summary>
/// <param name="delay"></param>
private void RescheduleSynchronization(TimeSpan delay)
{
Debug.Assert(_subscriptionLock.CurrentCount == 0, "Must be locked");
if (delay == TimeSpan.MaxValue || delay < TimeSpan.Zero)
{
// covers Timeout.Infinite too which is negative
return;
}
var nextSync = _timeProvider.GetUtcNow() + delay;
if (nextSync <= _nextSync)
{
_logger.LogInformation("Reschedule synchronization to {Time} in {Delay} ms",
nextSync, delay.TotalMilliseconds);
_nextSync = nextSync;
_resyncTimer.Change(delay, Timeout.InfiniteTimeSpan);
}
}
/// <summary>
/// Subscription registration
/// </summary>
private sealed record Registration : ISubscription, ISubscriptionDiagnostics
{
/// <summary>
/// The subscription configuration
/// </summary>
public SubscriptionModel Subscription { get; }
/// <summary>
/// Monitored items on the subscriber
/// </summary>
internal ISubscriber Owner { get; }
/// <summary>
/// Mark the registration as dirty
/// </summary>
internal bool Dirty { get; set; }
/// <inheritdoc/>
public IOpcUaClientDiagnostics ClientDiagnostics => _outer;
/// <inheritdoc/>
public ISubscriptionDiagnostics Diagnostics => this;
/// <inheritdoc/>
public int GoodMonitoredItems
=> _outer.TryGetSubscription(Subscription, out var subscription)
? subscription.GetGoodMonitoredItems(Owner) : 0;
/// <inheritdoc/>
public int BadMonitoredItems
=> _outer.TryGetSubscription(Subscription, out var subscription)
? subscription.GetBadMonitoredItems(Owner) : 0;
/// <inheritdoc/>
public int LateMonitoredItems
=> _outer.TryGetSubscription(Subscription, out var subscription)
? subscription.GetLateMonitoredItems(Owner) : 0;
/// <inheritdoc/>
public int HeartbeatsEnabled
=> _outer.TryGetSubscription(Subscription, out var subscription)
? subscription.GetHeartbeatsEnabled(Owner) : 0;
/// <inheritdoc/>
public int ConditionsEnabled
=> _outer.TryGetSubscription(Subscription, out var subscription)
? subscription.GetConditionsEnabled(Owner) : 0;
/// <summary>
/// Create subscription
/// </summary>
/// <param name="outer"></param>
/// <param name="subscription"></param>
/// <param name="owner"></param>
public Registration(OpcUaClient outer,
SubscriptionModel subscription, ISubscriber owner)
{
Subscription = subscription;
Owner = owner;
_outer = outer;
_outer.AddRef();
AddNoLockInternal();
}
/// <inheritdoc/>
public async ValueTask DisposeAsync()
{
if (_outer._disposed)
{
//
// Possibly the client has shut down before the owners of
// the registration have disposed it. This is not an error.
// It might however be better to order the clients to get
// disposed before clients.
//
return;
}
// Remove registration
await _outer._subscriptionLock.WaitAsync().ConfigureAwait(false);
try
{
RemoveNoLockInternal();
_outer.TriggerSubscriptionSynchronization(null);
}
finally
{
_outer._subscriptionLock.Release();
_outer.Release();
}
}
/// <inheritdoc/>
public OpcUaSubscriptionNotification? CreateKeepAlive()
{
if (!_outer.TryGetSubscription(Subscription, out var subscription))
{
return null;
}
return subscription.CreateKeepAlive();
}
/// <inheritdoc/>
public void NotifyMonitoredItemsChanged()
{
Dirty = true;
_outer.TryGetSubscription(Subscription, out var subscription);
_outer.TriggerSubscriptionSynchronization(subscription);
}
/// <inheritdoc/>
public async ValueTask<PublishedDataSetMetaDataModel> CollectMetaDataAsync(
ISubscriber owner, DataSetFieldContentFlags? fieldMask,
DataSetMetaDataModel dataSetMetaData, uint minorVersion,
CancellationToken ct = default)
{
if (!_outer.TryGetSubscription(Subscription, out var subscription))
{
throw new ServiceResultException(StatusCodes.BadNoSubscription,
"Subscription not found");
}
return await subscription.CollectMetaDataAsync(owner, fieldMask,
dataSetMetaData, minorVersion, ct).ConfigureAwait(false);
}
/// <summary>
/// Remove registration and release reference but not under
/// lock (like user of the registration handle) and without
/// triggering an update.
/// </summary>
internal void RemoveAndReleaseNoLockInternal()
{
RemoveNoLockInternal();
_outer.Release();
}
private void AddNoLockInternal()
{
_outer._registrations.Add(Owner, this);
_outer._s2r.AddOrUpdate(Subscription, _
=> [this],
(_, c) =>
{
c.Add(this);
return c;
});
}
private void RemoveNoLockInternal()
{
_outer._s2r.AddOrUpdate(Subscription, _ =>
{
Debug.Fail("Unexpected");
return [];
}, (_, c) =>
{
c.Remove(this);
return c;
});
_outer._registrations.Remove(Owner);
}
private readonly OpcUaClient _outer;
}
private DateTimeOffset _nextSync;
#pragma warning disable CA2213 // Disposable fields should be disposed
private readonly SemaphoreSlim _subscriptionLock = new(1, 1);
private readonly ITimer _resyncTimer;
#pragma warning restore CA2213 // Disposable fields should be disposed
private readonly Dictionary<ISubscriber, Registration> _registrations = [];
private readonly ConcurrentDictionary<SubscriptionModel, List<Registration>> _s2r = new();
private readonly Dictionary<SubscriptionModel, OpcUaSubscription> _cache = [];
private readonly IOptions<OpcUaSubscriptionOptions> _subscriptionOptions;
}
}