in src/Azure.IIoT.OpcUa.Publisher/src/Storage/PublishedNodesConverter.cs [320:696]
public IEnumerable<WriterGroupModel> ToWriterGroups(IEnumerable<PublishedNodesEntryModel> entries)
{
if (entries == null)
{
return [];
}
var sw = Stopwatch.StartNew();
try
{
if (!_noPublishingIntervalGrouping)
{
//
// Split all entries by the publishing interval in the nodes using the entry publishing
// interval as default. To prevent entries with no nodes to be removed here a dummy
// entry is added to the list of nodes and removed again from the group entries selected.
//
entries = entries
.SelectMany(entry => GetNodeModels(entry, _scaleTestCount)
.DefaultIfEmpty(kDummyEntry)
.GroupBy(n => n.GetNormalizedPublishingInterval(
entry.GetNormalizedDataSetPublishingInterval()))
.Select(g => entry with
{
// Set the publishing interval for this entry at the top
DataSetPublishingIntervalTimespan = g.Key,
DataSetPublishingInterval = null,
OpcNodes = g
.Where(n => n != kDummyEntry)
.Select(n => n with
{
// Unset all node specific settings.
OpcPublishingIntervalTimespan = null,
OpcPublishingInterval = null
})
.ToList()
}));
}
return entries
//
// Now we have entries with nodes that have no publishing interval, group all entries
// by group identifier
//
.Select(entry => (
Entry: entry,
UniqueGroupId: entry.GetUniqueWriterGroupId()
))
.GroupBy(entry => entry.UniqueGroupId)
.Select(g => (g.Key, Entries: g.ToList()))
//
// In each group select the writers using the unique data set writer id which uses the
// publishing interval.
//
.Select(group => (
Id: group.Key,
Header: group.Entries[0].Entry,
Writers: group.Entries
.Select(entry => (
entry.Entry,
UniqueWriterId: entry.Entry.GetUniqueDataSetWriterId()
))
.GroupBy(e => e.UniqueWriterId)
.Select(w => (w.Key, Writers: w.Select(e => e.Entry).ToList()))
.ToList()
))
// Now bring it all together into a group with writers and settings
.Select(group => new WriterGroupModel
{
Id = group.Id,
MessageType = group.Header.MessageEncoding,
Transport = group.Header.WriterGroupTransport,
Publishing = new PublishingQueueSettingsModel
{
RequestedDeliveryGuarantee = group.Header.WriterGroupQualityOfService,
QueueName = group.Header.WriterGroupQueueName,
Retain = group.Header.WriterGroupMessageRetention,
Ttl = group.Header.WriterGroupMessageTtlTimepan
},
HeaderLayoutUri = group.Header.MessagingMode?.ToString(),
Name = group.Header.DataSetWriterGroup,
NotificationPublishThreshold = group.Header.BatchSize,
PublishQueuePartitions = group.Header.WriterGroupPartitions,
PublishingInterval = group.Header.GetNormalizedBatchTriggerInterval(),
DataSetWriters = group.Writers
.Select(w => (
WriterId: w.Key,
Header: w.Writers[0],
WriterBatches: w.Writers
.SelectMany(w => w.OpcNodes!)
.Distinct(OpcNodeModelEx.Comparer)
.Batch(_maxNodesPerDataSet)
// Future: batch in service so it is centralized
))
.SelectMany(b => b.WriterBatches // Do we need to materialize here?
.DefaultIfEmpty(kDummyEntry.YieldReturn())
.Select(n => n.ToList())
.Select((nodes, index) => new DataSetWriterModel
{
Id = b.WriterId + "_" + index,
DataSetWriterName = b.Header.DataSetWriterId,
MetaDataUpdateTime = b.Header.GetNormalizedMetaDataUpdateTime(),
KeyFrameCount = b.Header.DataSetKeyFrameCount,
Publishing = new PublishingQueueSettingsModel
{
QueueName = b.Header.QueueName,
RequestedDeliveryGuarantee = b.Header.QualityOfService,
Retain = b.Header.MessageRetention,
Ttl = b.Header.MessageTtlTimespan
},
MetaData = new PublishingQueueSettingsModel
{
QueueName = b.Header.MetaDataQueueName,
RequestedDeliveryGuarantee = null,
Retain = true,
Ttl = null
},
DataSet = new PublishedDataSetModel
{
Name = b.Header.DataSetName,
DataSetMetaData = new DataSetMetaDataModel
{
DataSetClassId = b.Header.DataSetClassId,
Description = b.Header.DataSetDescription,
Name = b.Header.DataSetName
},
ExtensionFields = b.Header.DataSetExtensionFields?
.Select(ef => new ExtensionFieldModel
{
DataSetFieldName = ef.Key,
Value = ef.Value
})
.ToList(),
SendKeepAlive = b.Header.SendKeepAliveDataSetMessages,
Routing = b.Header.DataSetRouting,
DataSetSource = new PublishedDataSetSourceModel
{
Connection = b.Header.ToConnectionModel(ToCredential),
SubscriptionSettings = new PublishedDataSetSettingsModel
{
MaxKeepAliveCount = b.Header.MaxKeepAliveCount,
RepublishAfterTransfer = b.Header.RepublishAfterTransfer,
MonitoredItemWatchdogTimeout = b.Header.OpcNodeWatchdogTimespan,
MonitoredItemWatchdogCondition = b.Header.OpcNodeWatchdogCondition,
WatchdogBehavior = b.Header.DataSetWriterWatchdogBehavior,
Priority = b.Header.Priority,
ResolveDisplayName = b.Header.DataSetFetchDisplayNames,
DefaultHeartbeatBehavior = b.Header.DefaultHeartbeatBehavior,
DefaultHeartbeatInterval = b.Header.GetNormalizedDefaultHeartbeatInterval(),
DefaultSamplingInterval = b.Header.GetNormalizedDataSetSamplingInterval(),
PublishingInterval = b.Header.GetNormalizedDataSetPublishingInterval(),
MaxNotificationsPerPublish = null,
EnableImmediatePublishing = null,
EnableSequentialPublishing = null,
LifeTimeCount = null,
UseDeferredAcknoledgements = null
// ...
},
PublishedVariables = ToPublishedDataItems(nodes.Where(n => n != kDummyEntry), false),
PublishedEvents = ToPublishedEventItems(nodes.Where(n => n != kDummyEntry), false)
}
},
MessageSettings = null,
DataSetFieldContentMask = null
}))
.ToList(),
KeepAliveTime = null,
MaxNetworkMessageSize = null,
MessageSettings = null,
Priority = null,
PublishQueueSize = null,
SecurityGroupId = null,
SecurityKeyServices = null,
SecurityMode = null,
LocaleIds = null
})
.ToList(); // Convert here or else we dont print conversion correctly
}
catch (Exception ex)
{
_logger.LogError(ex, "failed to convert the published nodes.");
return [];
}
finally
{
_logger.LogInformation("Converted published nodes entry models to jobs in {Elapsed}",
sw.Elapsed);
sw.Stop();
}
IEnumerable<OpcNodeModel> GetNodeModels(PublishedNodesEntryModel item, int scaleTestCount)
{
if (item.OpcNodes != null)
{
foreach (var node in item.OpcNodes)
{
if (!node.TryGetId(out var id))
{
_logger.LogError("No node id was configured in the opc node entry - skipping...");
continue;
}
if (scaleTestCount <= 1)
{
yield return new OpcNodeModel
{
Id = id,
DisplayName = node.DisplayName,
DataSetClassFieldId = node.DataSetClassFieldId,
DataSetFieldId = node.DataSetFieldId,
ExpandedNodeId = node.ExpandedNodeId,
// The publishing interval item wins over dataset over global default
OpcPublishingIntervalTimespan = node.GetNormalizedPublishingInterval()
?? item.GetNormalizedDataSetPublishingInterval(),
OpcSamplingIntervalTimespan = node.GetNormalizedSamplingInterval(),
HeartbeatIntervalTimespan = node.GetNormalizedHeartbeatInterval(),
QueueSize = node.QueueSize,
DiscardNew = node.DiscardNew,
BrowsePath = node.BrowsePath,
AttributeId = node.AttributeId,
FetchDisplayName = node.FetchDisplayName,
IndexRange = node.IndexRange,
RegisterNode = node.RegisterNode,
UseCyclicRead = node.UseCyclicRead,
CyclicReadMaxAgeTimespan = node.GetNormalizedCyclicReadMaxAge(),
SkipFirst = node.SkipFirst,
DataChangeTrigger = node.DataChangeTrigger,
DeadbandType = node.DeadbandType,
DeadbandValue = node.DeadbandValue,
EventFilter = node.EventFilter,
HeartbeatBehavior = node.HeartbeatBehavior,
ConditionHandling = node.ConditionHandling,
TriggeredNodes = node.TriggeredNodes,
QualityOfService = node.QualityOfService,
Topic = node.Topic,
ModelChangeHandling = node.ModelChangeHandling
};
}
else
{
for (var i = 0; i < scaleTestCount; i++)
{
yield return new OpcNodeModel
{
Id = id,
DisplayName = !string.IsNullOrEmpty(node.DisplayName) ?
$"{node.DisplayName}_{i}" : null,
DataSetFieldId = node.DataSetFieldId,
DataSetClassFieldId = node.DataSetClassFieldId,
ExpandedNodeId = node.ExpandedNodeId,
HeartbeatIntervalTimespan = node.GetNormalizedHeartbeatInterval(),
// The publishing interval item wins over dataset over global default
OpcPublishingIntervalTimespan = node.GetNormalizedPublishingInterval()
?? item.GetNormalizedDataSetPublishingInterval(),
OpcSamplingIntervalTimespan = node.GetNormalizedSamplingInterval(),
QueueSize = node.QueueSize,
SkipFirst = node.SkipFirst,
DataChangeTrigger = node.DataChangeTrigger,
BrowsePath = node.BrowsePath,
AttributeId = node.AttributeId,
FetchDisplayName = node.FetchDisplayName,
IndexRange = node.IndexRange,
RegisterNode = node.RegisterNode,
UseCyclicRead = node.UseCyclicRead,
CyclicReadMaxAgeTimespan = node.GetNormalizedCyclicReadMaxAge(),
DeadbandType = node.DeadbandType,
DeadbandValue = node.DeadbandValue,
DiscardNew = node.DiscardNew,
HeartbeatBehavior = node.HeartbeatBehavior,
EventFilter = node.EventFilter,
TriggeredNodes = null,
ConditionHandling = node.ConditionHandling,
QualityOfService = node.QualityOfService,
Topic = node.Topic,
ModelChangeHandling = node.ModelChangeHandling
};
}
}
}
}
if (!string.IsNullOrWhiteSpace(item.NodeId?.Identifier))
{
yield return new OpcNodeModel
{
Id = item.NodeId.Identifier,
OpcPublishingIntervalTimespan = item.GetNormalizedDataSetPublishingInterval()
};
}
}
static PublishedDataItemsModel ToPublishedDataItems(IEnumerable<OpcNodeModel> opcNodes, bool skipTriggering)
{
return new PublishedDataItemsModel
{
PublishedData = opcNodes.Where(node => node.EventFilter == null && node.ModelChangeHandling == null)
.Select(node => new PublishedDataSetVariableModel
{
Id = node.DataSetFieldId,
PublishedVariableNodeId = node.Id,
DataSetClassFieldId = node.DataSetClassFieldId,
// At this point in time the next values are ensured to be filled in with
// the appropriate value: configured or default
PublishedVariableDisplayName = node.DisplayName,
SamplingIntervalHint = node.OpcSamplingIntervalTimespan,
HeartbeatInterval = node.HeartbeatIntervalTimespan,
HeartbeatBehavior = node.HeartbeatBehavior,
ServerQueueSize = node.QueueSize,
DiscardNew = node.DiscardNew,
SamplingUsingCyclicRead = node.UseCyclicRead,
CyclicReadMaxAge = node.CyclicReadMaxAgeTimespan,
Attribute = node.AttributeId,
IndexRange = node.IndexRange,
RegisterNodeForSampling = node.RegisterNode,
BrowsePath = node.BrowsePath,
ReadDisplayNameFromNode = node.FetchDisplayName,
MonitoringMode = null,
SubstituteValue = null,
SkipFirst = node.SkipFirst,
DataChangeTrigger = node.DataChangeTrigger,
DeadbandValue = node.DeadbandValue,
DeadbandType = node.DeadbandType,
Publishing = node.Topic == null && node.QualityOfService == null
? null : new PublishingQueueSettingsModel
{
QueueName = node.Topic,
RequestedDeliveryGuarantee = node.QualityOfService,
Retain = null,
Ttl = null
},
Triggering = skipTriggering || node.TriggeredNodes == null
? null : new PublishedDataSetTriggerModel
{
PublishedVariables = ToPublishedDataItems(node.TriggeredNodes, true),
PublishedEvents = ToPublishedEventItems(node.TriggeredNodes, true)
}
})
.ToList()
};
}
static PublishedEventItemsModel ToPublishedEventItems(IEnumerable<OpcNodeModel> opcNodes, bool skipTriggering)
{
return new PublishedEventItemsModel
{
PublishedData = opcNodes.Where(node => node.EventFilter != null || node.ModelChangeHandling != null)
.Select(node => new PublishedDataSetEventModel
{
Id = node.DataSetFieldId,
EventNotifier = node.Id,
QueueSize = node.QueueSize,
DiscardNew = node.DiscardNew,
PublishedEventName = node.DisplayName,
ReadEventNameFromNode = node.FetchDisplayName,
BrowsePath = node.BrowsePath,
MonitoringMode = null,
TypeDefinitionId = node.EventFilter?.TypeDefinitionId,
SelectedFields = node.EventFilter?.SelectClauses?.Select(s => s.Clone()).ToList(),
Filter = node.EventFilter?.WhereClause.Clone(),
ConditionHandling = node.ConditionHandling.Clone(),
ModelChangeHandling = node.ModelChangeHandling.Clone(),
Publishing = node.Topic == null && node.QualityOfService == null
? null : new PublishingQueueSettingsModel
{
QueueName = node.Topic,
RequestedDeliveryGuarantee = node.QualityOfService,
Retain = null,
Ttl = null
},
Triggering = skipTriggering || node.TriggeredNodes == null
? null : new PublishedDataSetTriggerModel
{
PublishedVariables = ToPublishedDataItems(node.TriggeredNodes, true),
PublishedEvents = ToPublishedEventItems(node.TriggeredNodes, true)
}
}).ToList()
};
}
}