in src/Azure.IIoT.OpcUa.Publisher/src/Storage/PublishedNodesConverter.cs [88:313]
public IEnumerable<PublishedNodesEntryModel> ToPublishedNodes(uint version, DateTimeOffset lastChanged,
IEnumerable<WriterGroupModel> items, bool preferTimeSpan = true)
{
if (items == null)
{
return [];
}
var sw = Stopwatch.StartNew();
try
{
var publishedNodesEntries = items
.Where(group => group?.DataSetWriters?.Count > 0)
.SelectMany(group => group.DataSetWriters!
.Where(writer =>
writer.DataSet?.DataSetSource?.PublishedVariables?.PublishedData != null
|| writer.DataSet?.DataSetSource?.PublishedEvents?.PublishedData != null)
.Select(writer => (WriterGroup: group, Writer: writer)))
.Select(item => AddConnectionModel(item.Writer.DataSet?.DataSetSource?.Connection,
new PublishedNodesEntryModel
{
NodeId = null,
Version = version,
LastChangeDateTime = lastChanged,
DataSetClassId = item.Writer.DataSet?.DataSetMetaData?.DataSetClassId ?? Guid.Empty,
DataSetDescription = item.Writer.DataSet?.DataSetMetaData?.Description,
DataSetKeyFrameCount = item.Writer.KeyFrameCount,
MessagingMode = item.WriterGroup.HeaderLayoutUri == null ? null :
Enum.Parse<MessagingMode>(item.WriterGroup.HeaderLayoutUri), // TODO: Make safe
MessageEncoding = item.WriterGroup.MessageType,
WriterGroupTransport = item.WriterGroup.Transport,
WriterGroupQualityOfService = item.WriterGroup.Publishing?.RequestedDeliveryGuarantee,
WriterGroupMessageTtlTimepan = item.WriterGroup.Publishing?.Ttl,
WriterGroupMessageRetention = item.WriterGroup.Publishing?.Retain,
WriterGroupPartitions = item.WriterGroup.PublishQueuePartitions,
WriterGroupQueueName = item.WriterGroup.Publishing?.QueueName,
SendKeepAliveDataSetMessages = item.Writer.DataSet?.SendKeepAlive ?? false,
DataSetExtensionFields = item.Writer.DataSet?.ExtensionFields?.ToDictionary(
e => e.DataSetFieldName, e => e.Value),
MetaDataUpdateTimeTimespan = item.Writer.MetaDataUpdateTime,
QueueName = item.Writer.Publishing?.QueueName,
QualityOfService = item.Writer.Publishing?.RequestedDeliveryGuarantee,
MessageTtlTimespan = item.Writer.Publishing?.Ttl,
MessageRetention = item.Writer.Publishing?.Retain,
MetaDataQueueName = item.Writer.MetaData?.QueueName,
MetaDataUpdateTime = null,
BatchTriggerIntervalTimespan = item.WriterGroup.PublishingInterval,
BatchTriggerInterval = null,
DataSetSamplingInterval = null,
DataSetSamplingIntervalTimespan =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.DefaultSamplingInterval,
DefaultHeartbeatInterval = null,
DefaultHeartbeatIntervalTimespan =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.DefaultHeartbeatInterval,
DefaultHeartbeatBehavior =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.DefaultHeartbeatBehavior,
Priority =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.Priority,
MaxKeepAliveCount =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.MaxKeepAliveCount,
DataSetFetchDisplayNames =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.ResolveDisplayName,
RepublishAfterTransfer =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.RepublishAfterTransfer,
OpcNodeWatchdogTimespan =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.MonitoredItemWatchdogTimeout,
OpcNodeWatchdogCondition =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.MonitoredItemWatchdogCondition,
DataSetWriterWatchdogBehavior =
item.Writer.DataSet?.DataSetSource?.SubscriptionSettings?.WatchdogBehavior,
BatchSize = item.WriterGroup.NotificationPublishThreshold,
DataSetName = item.Writer.DataSet?.Name,
DataSetWriterGroup =
item.WriterGroup.Name == Constants.DefaultWriterGroupName ? null : item.WriterGroup.Name,
DataSetWriterId = item.Writer.DataSetWriterName,
DataSetRouting = item.Writer.DataSet?.Routing,
DataSetPublishingInterval = null,
DataSetPublishingIntervalTimespan = null,
OpcNodes = ToOpcNodes(item.Writer.DataSet?.DataSetSource?.SubscriptionSettings,
item.Writer.DataSet?.DataSetSource?.PublishedVariables,
item.Writer.DataSet?.DataSetSource?.PublishedEvents, preferTimeSpan, false)?
.ToList() ?? [],
// ...
// Added by Add connection information
OpcAuthenticationMode = OpcAuthenticationMode.Anonymous,
OpcAuthenticationUsername = null,
OpcAuthenticationPassword = null,
EndpointUrl = string.Empty,
UseSecurity = false,
UseReverseConnect = null,
DisableSubscriptionTransfer = null,
DumpConnectionDiagnostics = null,
EndpointSecurityPolicy = null,
EndpointSecurityMode = null,
EncryptedAuthPassword = null,
EncryptedAuthUsername = null
}));
// Coalesce into unique nodes entry data set groups
// TODO: We should start with the grouping earlier
return publishedNodesEntries
.GroupBy(item => item,
new FuncCompare<PublishedNodesEntryModel>((x, y) => x!.HasSameDataSet(y!)))
.Select(group =>
{
group.Key.OpcNodes = group
.Where(g => g.OpcNodes != null)
.SelectMany(g => g.OpcNodes!)
.ToList();
return group.Key;
});
}
catch (Exception ex)
{
_logger.LogError(ex, "failed to convert the published nodes.");
return [];
}
finally
{
_logger.LogInformation("Converted published nodes entry models to jobs in {Elapsed}",
sw.Elapsed);
sw.Stop();
}
static IEnumerable<OpcNodeModel>? ToOpcNodes(PublishedDataSetSettingsModel? subscriptionSettings,
PublishedDataItemsModel? publishedVariables, PublishedEventItemsModel? publishedEvents, bool preferTimeSpan,
bool skipTriggeringNodes)
{
if (publishedVariables == null && publishedEvents == null)
{
return null;
}
return (publishedVariables?.PublishedData ?? Enumerable.Empty<PublishedDataSetVariableModel>())
.Select(variable => new OpcNodeModel
{
DeadbandType = variable.DeadbandType,
DeadbandValue = variable.DeadbandValue,
DataSetClassFieldId = variable.DataSetClassFieldId,
Id = variable.PublishedVariableNodeId,
DisplayName = variable.PublishedVariableDisplayName,
DataSetFieldId = variable.Id,
AttributeId = variable.Attribute,
IndexRange = variable.IndexRange,
RegisterNode = variable.RegisterNodeForSampling,
FetchDisplayName = variable.ReadDisplayNameFromNode,
BrowsePath = variable.BrowsePath,
UseCyclicRead = variable.SamplingUsingCyclicRead,
CyclicReadMaxAge = preferTimeSpan ? null : (int?)variable.CyclicReadMaxAge?.TotalMilliseconds,
CyclicReadMaxAgeTimespan = !preferTimeSpan ? null : variable.CyclicReadMaxAge,
DiscardNew = variable.DiscardNew,
QueueSize = variable.ServerQueueSize,
DataChangeTrigger = variable.DataChangeTrigger,
HeartbeatBehavior = variable.HeartbeatBehavior,
HeartbeatInterval = preferTimeSpan ? null : (int?)variable.HeartbeatInterval?.TotalSeconds,
HeartbeatIntervalTimespan = !preferTimeSpan ? null : variable.HeartbeatInterval,
OpcSamplingInterval = preferTimeSpan ? null : (int?)variable.SamplingIntervalHint?.TotalMilliseconds,
OpcSamplingIntervalTimespan = !preferTimeSpan ? null : variable.SamplingIntervalHint,
OpcPublishingInterval = preferTimeSpan ? null : (int?)
subscriptionSettings?.PublishingInterval?.TotalMilliseconds,
OpcPublishingIntervalTimespan = !preferTimeSpan ? null :
subscriptionSettings?.PublishingInterval,
SkipFirst = variable.SkipFirst,
TriggeredNodes = skipTriggeringNodes ? null : ToOpcNodes(subscriptionSettings,
variable.Triggering?.PublishedVariables,
variable.Triggering?.PublishedEvents, preferTimeSpan, true)?.ToList(),
Topic = variable.Publishing?.QueueName,
QualityOfService = variable.Publishing?.RequestedDeliveryGuarantee,
// MonitoringMode = variable.MonitoringMode,
// ...
ExpandedNodeId = null,
ConditionHandling = null,
ModelChangeHandling = null,
EventFilter = null
})
.Concat((publishedEvents?.PublishedData ?? Enumerable.Empty<PublishedDataSetEventModel>())
.Select(evt => new OpcNodeModel
{
Id = evt.EventNotifier,
EventFilter = new EventFilterModel
{
TypeDefinitionId = evt.TypeDefinitionId,
SelectClauses = evt.SelectedFields?.Select(s => s.Clone()).ToList(),
WhereClause = evt.Filter.Clone()
},
ConditionHandling = evt.ConditionHandling.Clone(),
ModelChangeHandling = evt.ModelChangeHandling.Clone(),
DataSetFieldId = evt.Id,
DisplayName = evt.PublishedEventName,
FetchDisplayName = evt.ReadEventNameFromNode,
BrowsePath = evt.BrowsePath,
DiscardNew = evt.DiscardNew,
QueueSize = evt.QueueSize,
TriggeredNodes = skipTriggeringNodes ? null : ToOpcNodes(subscriptionSettings,
evt.Triggering?.PublishedVariables,
evt.Triggering?.PublishedEvents, preferTimeSpan, true)?.ToList(),
Topic = evt.Publishing?.QueueName,
QualityOfService = evt.Publishing?.RequestedDeliveryGuarantee,
// MonitoringMode = evt.MonitoringMode,
// ...
DeadbandType = null,
DataChangeTrigger = null,
DataSetClassFieldId = Guid.Empty,
DeadbandValue = null,
ExpandedNodeId = null,
HeartbeatInterval = null,
HeartbeatBehavior = null,
HeartbeatIntervalTimespan = null,
OpcSamplingInterval = null,
OpcSamplingIntervalTimespan = null,
CyclicReadMaxAgeTimespan = null,
CyclicReadMaxAge = null,
AttributeId = null,
RegisterNode = null,
UseCyclicRead = null,
IndexRange = null,
OpcPublishingInterval = preferTimeSpan ? null : (int?)
subscriptionSettings?.PublishingInterval?.TotalMilliseconds,
OpcPublishingIntervalTimespan = !preferTimeSpan ? null :
subscriptionSettings?.PublishingInterval,
SkipFirst = null
}));
}
}