in src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaSubscription.cs [970:1488]
private async ValueTask<TimeSpan> SynchronizeMonitoredItemsAsync(
Partition partition, OperationLimitsModel operationLimits, CancellationToken ct)
{
if (Session is not OpcUaSession session)
{
throw ServiceResultException.Create(StatusCodes.BadSessionIdInvalid,
"Session not connected.");
}
// Get the items assigned to this subscription.
#pragma warning disable CA2000 // Dispose objects before losing scope
var desired = OpcUaMonitoredItem
.Create(_client, partition.Items, _loggerFactory, _timeProvider)
.ToHashSet();
#pragma warning restore CA2000 // Dispose objects before losing scope
var previouslyMonitored = CurrentlyMonitored.ToHashSet();
var remove = previouslyMonitored.Except(desired).ToHashSet();
var add = desired.Except(previouslyMonitored).ToHashSet();
var same = previouslyMonitored.ToHashSet();
var errorsDuringSync = 0;
same.IntersectWith(desired);
//
// Resolve the browse paths for all nodes first.
//
// We shortcut this only through the added items since the identity (hash)
// of the monitored item is dependent on the node and browse path, so any
// update of either results in a newly added monitored item and the old
// one removed.
//
var allResolvers = add
.Select(a => a.Resolve)
.Where(a => a != null);
foreach (var resolvers in allResolvers.Batch(
operationLimits.GetMaxNodesPerTranslatePathsToNodeIds()))
{
var response = await session.Services.TranslateBrowsePathsToNodeIdsAsync(
new RequestHeader(), new BrowsePathCollection(resolvers
.Select(a => new BrowsePath
{
StartingNode = a!.Value.NodeId.ToNodeId(
session.MessageContext),
RelativePath = a.Value.Path.ToRelativePath(
session.MessageContext)
})), ct).ConfigureAwait(false);
var results = response.Validate(response.Results, s => s.StatusCode,
response.DiagnosticInfos, resolvers);
if (results.ErrorInfo != null)
{
// Could not do anything...
_logger.LogWarning(
"Failed to resolve browse path in {Subscription} due to {ErrorInfo}...",
this, results.ErrorInfo);
throw ServiceResultException.Create(results.ErrorInfo.StatusCode,
results.ErrorInfo.ErrorMessage ?? "Failed to resolve browse paths");
}
foreach (var result in results)
{
var resolvedId = NodeId.Null;
if (result.ErrorInfo == null && result.Result.Targets.Count == 1)
{
resolvedId = result.Result.Targets[0].TargetId.ToNodeId(
session.MessageContext.NamespaceUris);
result.Request!.Value.Update(resolvedId, session.MessageContext);
}
else
{
_logger.LogWarning("Failed to resolve browse path for {NodeId} " +
"in {Subscription} due to '{ServiceResult}'",
result.Request!.Value.NodeId, this, result.ErrorInfo);
errorsDuringSync++;
}
}
}
//
// If retrieving paths for all the items from the root folder was configured do so
// now. All items that fail here should be retried later.
//
if (ResolveBrowsePathFromRoot)
{
var allGetPaths = add
.Select(a => a.GetPath)
.Where(a => a != null);
var pathsRetrieved = 0;
foreach (var getPathsBatch in allGetPaths.Batch(10000))
{
var getPath = getPathsBatch.ToList();
var paths = await session.GetBrowsePathsFromRootAsync(new RequestHeader(),
getPath.Select(n => n!.Value.NodeId.ToNodeId(session.MessageContext)),
ct).ConfigureAwait(false);
for (var index = 0; index < paths.Count; index++)
{
getPath[index]!.Value.Update(paths[index].Path, session.MessageContext);
if (paths[index].ErrorInfo != null)
{
_logger.LogWarning("Failed to get root path for {NodeId} " +
"in {Subscription} due to '{ServiceResult}'",
getPath[index]!.Value.NodeId, this, paths[index].ErrorInfo);
}
else
{
pathsRetrieved++;
}
}
}
if (pathsRetrieved > 0)
{
_logger.LogInformation(
"Retrieved {Count} paths for items in subscription {Subscription}.",
pathsRetrieved, this);
}
}
//
// Register nodes for reading if needed. This is needed anytime the session
// changes as the registration is only valid in the context of the session
//
var allRegistrations = add.Concat(same)
.Select(a => a.Register)
.Where(a => a != null);
foreach (var registrations in allRegistrations.Batch(
operationLimits.GetMaxNodesPerRegisterNodes()))
{
var response = await session.Services.RegisterNodesAsync(
new RequestHeader(), new NodeIdCollection(registrations
.Select(a => a!.Value.NodeId.ToNodeId(session.MessageContext))),
ct).ConfigureAwait(false);
foreach (var (First, Second) in response.RegisteredNodeIds.Zip(registrations))
{
Debug.Assert(Second != null);
if (!NodeId.IsNull(First))
{
Second.Value.Update(First, session.MessageContext);
}
}
}
var metadataChanged = new HashSet<ISubscriber>();
var applyChanges = false;
var updated = 0;
foreach (var toUpdate in same)
{
if (!desired.TryGetValue(toUpdate, out var theDesiredUpdate))
{
errorsDuringSync++;
continue;
}
desired.Remove(theDesiredUpdate);
Debug.Assert(toUpdate.GetType() == theDesiredUpdate.GetType());
Debug.Assert(toUpdate.Subscription == this);
try
{
if (toUpdate.MergeWith(theDesiredUpdate, session, out var metadata))
{
_logger.LogDebug(
"Trying to update monitored item '{Item}' in {Subscription}...",
toUpdate, this);
if (toUpdate.FinalizeMergeWith != null && metadata)
{
await toUpdate.FinalizeMergeWith(session, ct).ConfigureAwait(false);
}
updated++;
applyChanges = true;
}
if (metadata)
{
metadataChanged.Add(toUpdate.Owner);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to update monitored item '{Item}' in {Subscription}...",
toUpdate, this);
errorsDuringSync++;
}
finally
{
theDesiredUpdate.Dispose();
}
}
var removed = 0;
foreach (var toRemove in remove)
{
Debug.Assert(toRemove.Subscription == this);
try
{
if (toRemove.RemoveFrom(this, out var metadata))
{
_logger.LogDebug(
"Trying to remove monitored item '{Item}' from {Subscription}...",
toRemove, this);
removed++;
applyChanges = true;
}
if (metadata)
{
metadataChanged.Add(toRemove.Owner);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to remove monitored item '{Item}' from {Subscription}...",
toRemove, this);
errorsDuringSync++;
}
}
var added = 0;
foreach (var toAdd in add)
{
desired.Remove(toAdd);
Debug.Assert(toAdd.Subscription == null);
try
{
if (toAdd.AddTo(this, session, out var metadata))
{
_logger.LogDebug(
"Adding monitored item '{Item}' to {Subscription}...",
toAdd, this);
if (toAdd.FinalizeAddTo != null && metadata)
{
await toAdd.FinalizeAddTo(session, ct).ConfigureAwait(false);
}
added++;
applyChanges = true;
}
if (metadata)
{
metadataChanged.Add(toAdd.Owner);
}
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to add monitored item '{Item}' to {Subscription}...",
toAdd, this);
errorsDuringSync++;
}
}
Debug.Assert(desired.Count == 0, "We should have processed all desired updates.");
if (applyChanges)
{
await ApplyChangesAsync(ct).ConfigureAwait(false);
if (MonitoredItemCount == 0 && !EnableImmediatePublishing)
{
await SetPublishingModeAsync(false, ct).ConfigureAwait(false);
_logger.LogInformation(
"Disabled empty Subscription {Subscription} in session {Session}.",
this, session);
ResetMonitoredItemWatchdogTimer(false);
}
}
// Perform second pass over all monitored items and complete.
applyChanges = false;
var badMonitoredItems = 0;
var desiredMonitoredItems = same;
desiredMonitoredItems.UnionWith(add);
//
// Resolve display names for all nodes that still require a name
// other than the node id string.
//
// Note that we use the desired set here and update the display
// name after AddTo/MergeWith as it only effects the messages
// and metadata emitted and not the item as it is set up in the
// subscription (like what we do when resolving nodes). This
// supports the scenario where the user sets a desired display
// name of null to force reading the display name from the node
// and updating the existing display name (previously set) and
// at the same time is quite effective to only update what is
// necessary.
//
var allDisplayNameUpdates = desiredMonitoredItems
.Select(a => (a.Owner, a.GetDisplayName))
.Where(a => a.GetDisplayName.HasValue)
.ToList();
if (allDisplayNameUpdates.Count > 0)
{
foreach (var displayNameUpdates in allDisplayNameUpdates.Batch(
operationLimits.GetMaxNodesPerRead()))
{
var response = await session.Services.ReadAsync(new RequestHeader(),
0, Opc.Ua.TimestampsToReturn.Neither, new ReadValueIdCollection(
displayNameUpdates.Select(a => new ReadValueId
{
NodeId = a.GetDisplayName!.Value.NodeId.ToNodeId(session.MessageContext),
AttributeId = (uint)NodeAttribute.DisplayName
})), ct).ConfigureAwait(false);
var results = response.Validate(response.Results,
s => s.StatusCode, response.DiagnosticInfos, displayNameUpdates);
if (results.ErrorInfo != null)
{
_logger.LogWarning(
"Failed to resolve display name in {Subscription} due to {ErrorInfo}...",
this, results.ErrorInfo);
// We will retry later.
errorsDuringSync++;
}
else
{
foreach (var result in results)
{
string? displayName = null;
if (result.Result.Value is not null)
{
displayName =
(result.Result.Value as LocalizedText)?.ToString();
metadataChanged.Add(result.Request.Owner);
}
else
{
_logger.LogWarning("Failed to read display name for {NodeId} " +
"in {Subscription} due to '{ServiceResult}'",
result.Request.GetDisplayName!.Value.NodeId, this,
result.ErrorInfo);
}
result.Request.GetDisplayName!.Value.Update(
displayName ?? string.Empty);
}
}
}
}
_logger.LogDebug(
"Completing {Count} same/added and {Removed} removed items in subscription {Subscription}...",
desiredMonitoredItems.Count, remove.Count, this);
foreach (var monitoredItem in desiredMonitoredItems.Concat(remove))
{
if (!monitoredItem.TryCompleteChanges(this, ref applyChanges))
{
// Apply more changes in future passes
badMonitoredItems++;
}
}
Debug.Assert(remove.All(m => !m.AttachedToSubscription),
"All removed items should be detached now");
var set = desiredMonitoredItems.Where(m => m.Valid).ToList();
_logger.LogDebug(
"Completed {Count} valid and {Invalid} invalid items in subscription {Subscription}...",
set.Count, desiredMonitoredItems.Count - set.Count, this);
var finalize = set
.Where(i => i.FinalizeCompleteChanges != null)
.Select(i => i.FinalizeCompleteChanges!(ct))
.ToArray();
if (finalize.Length > 0)
{
await Task.WhenAll(finalize).ConfigureAwait(false);
}
if (applyChanges)
{
// Apply any additional changes
await ApplyChangesAsync(ct).ConfigureAwait(false);
}
Debug.Assert(set.Select(m => m.ClientHandle).Distinct().Count() == set.Count,
"Client handles are not distinct or one of the items is null");
_logger.LogDebug(
"Setting monitoring mode on {Count} items in subscription {Subscription}...",
set.Count, this);
//
// Finally change the monitoring mode as required. Batch the requests
// on the update of monitored item state from monitored items. On AddTo
// the monitoring mode was already configured. This is for updates as
// they are not applied through ApplyChanges
//
foreach (var change in set.GroupBy(i => i.GetMonitoringModeChange()))
{
if (change.Key == null)
{
// Not a valid item
continue;
}
foreach (var itemsBatch in change.Batch(
operationLimits.GetMaxMonitoredItemsPerCall()))
{
var itemsToChange = itemsBatch.Cast<MonitoredItem>().ToList();
_logger.LogInformation(
"Set monitoring to {Value} for {Count} items in subscription {Subscription}.",
change.Key.Value, itemsToChange.Count, this);
var results = await SetMonitoringModeAsync(change.Key.Value,
itemsToChange, ct).ConfigureAwait(false);
if (results != null)
{
var erroneousResultsCount = results
.Count(r => r != null && StatusCode.IsNotGood(r.StatusCode));
// Check the number of erroneous results and log.
if (erroneousResultsCount > 0)
{
_logger.LogWarning(
"Failed to set monitoring for {Count} items in subscription {Subscription}.",
erroneousResultsCount, this);
for (var i = 0; i < results.Count && i < itemsToChange.Count; ++i)
{
if (StatusCode.IsNotGood(results[i].StatusCode))
{
_logger.LogWarning("Set monitoring for item '{Item}' in "
+ "subscription {Subscription} failed with '{Status}'.",
itemsToChange[i].StartNodeId, this, results[i].StatusCode);
}
}
// Retry later
errorsDuringSync++;
}
}
}
}
finalize = set
.Where(i => i.FinalizeMonitoringModeChange != null)
.Select(i => i.FinalizeMonitoringModeChange!(ct))
.ToArray();
if (finalize.Length > 0)
{
await Task.WhenAll(finalize).ConfigureAwait(false);
}
// Cleanup all items that are not in the currently monitoring list
var dispose = previouslyMonitored
.Except(set)
.ToList();
dispose.ForEach(m => m.Dispose());
// Notify semantic change now that we have update the monitored items
foreach (var owner in metadataChanged)
{
owner.OnMonitoredItemSemanticsChanged();
}
// Refresh condition
if (set.OfType<OpcUaMonitoredItem.Condition>().Any())
{
_logger.LogInformation(
"Issuing ConditionRefresh on subscription {Subscription}", this);
try
{
await ConditionRefreshAsync(ct).ConfigureAwait(false);
_logger.LogInformation("ConditionRefresh on subscription " +
"{Subscription} has completed.", this);
}
catch (Exception e)
{
_logger.LogInformation("ConditionRefresh on subscription " +
"{Subscription} failed with an exception '{Message}'",
this, e.Message);
errorsDuringSync++;
}
}
set.ForEach(item => item.LogRevisedSamplingRateAndQueueSize());
var goodMonitoredItems =
Math.Max(set.Count - badMonitoredItems, 0);
var reportingItems = set
.Count(r => r.Status?.MonitoringMode == Opc.Ua.MonitoringMode.Reporting);
var disabledItems = set
.Count(r => r.Status?.MonitoringMode == Opc.Ua.MonitoringMode.Disabled);
var samplingItems = set
.Count(r => r.Status?.MonitoringMode == Opc.Ua.MonitoringMode.Sampling);
var notAppliedItems = set
.Count(r => r.Status?.MonitoringMode != r.MonitoringMode);
var heartbeatItems = set
.Count(r => r is OpcUaMonitoredItem.Heartbeat);
var conditionItems = set
.Count(r => r is OpcUaMonitoredItem.Condition);
var heartbeatsEnabled = set
.Count(r => r is OpcUaMonitoredItem.Heartbeat h && h.TimerEnabled);
var conditionsEnabled = set
.Count(r => r is OpcUaMonitoredItem.Condition h && h.TimerEnabled);
ReportMonitoredItemChanges(set.Count, goodMonitoredItems, badMonitoredItems,
errorsDuringSync, notAppliedItems, reportingItems, disabledItems, heartbeatItems,
heartbeatsEnabled, conditionItems, conditionsEnabled, samplingItems,
dispose.Count);
// Set up subscription management trigger
if (badMonitoredItems != 0 || errorsDuringSync != 0)
{
// There were items that could not be added to subscription
return Delay(_options.Value.InvalidMonitoredItemRetryDelayDuration,
TimeSpan.FromMinutes(5));
}
else if (desiredMonitoredItems.Count != set.Count)
{
// There were items !Valid but desired.
return Delay(_options.Value.BadMonitoredItemRetryDelayDuration,
TimeSpan.FromMinutes(30));
}
else
{
// Nothing to do
return Delay(_options.Value.SubscriptionManagementIntervalDuration,
Timeout.InfiniteTimeSpan);
}
}