public IEnumerable ToWriterGroups()

in src/Azure.IIoT.OpcUa.Publisher/src/Storage/PublishedNodesConverter.cs [320:696]


        public IEnumerable<WriterGroupModel> ToWriterGroups(IEnumerable<PublishedNodesEntryModel> entries)
        {
            if (entries == null)
            {
                return [];
            }
            var sw = Stopwatch.StartNew();
            try
            {
                if (!_noPublishingIntervalGrouping)
                {
                    //
                    // Split all entries by the publishing interval in the nodes using the entry publishing
                    // interval as default. To prevent entries with no nodes to be removed here a dummy
                    // entry is added to the list of nodes and removed again from the group entries selected.
                    //
                    entries = entries
                        .SelectMany(entry => GetNodeModels(entry, _scaleTestCount)
                            .DefaultIfEmpty(kDummyEntry)
                            .GroupBy(n => n.GetNormalizedPublishingInterval(
                                          entry.GetNormalizedDataSetPublishingInterval()))
                            .Select(g => entry with
                            {
                                // Set the publishing interval for this entry at the top
                                DataSetPublishingIntervalTimespan = g.Key,
                                DataSetPublishingInterval = null,
                                OpcNodes = g
                                    .Where(n => n != kDummyEntry)
                                    .Select(n => n with
                                    {
                                        // Unset all node specific settings.
                                        OpcPublishingIntervalTimespan = null,
                                        OpcPublishingInterval = null
                                    })
                                    .ToList()
                            }));
                }
                return entries
                    //
                    // Now we have entries with nodes that have no publishing interval, group all entries
                    // by group identifier
                    //
                    .Select(entry => (
                        Entry: entry,
                        UniqueGroupId: entry.GetUniqueWriterGroupId()
                     ))
                    .GroupBy(entry => entry.UniqueGroupId)
                    .Select(g => (g.Key, Entries: g.ToList()))
                    //
                    // In each group select the writers using the unique data set writer id which uses the
                    // publishing interval.
                    //
                    .Select(group => (
                        Id: group.Key,
                        Header: group.Entries[0].Entry,
                        Writers: group.Entries
                            .Select(entry => (
                                entry.Entry,
                                UniqueWriterId: entry.Entry.GetUniqueDataSetWriterId()
                             ))
                            .GroupBy(e => e.UniqueWriterId)
                            .Select(w => (w.Key, Writers: w.Select(e => e.Entry).ToList()))
                            .ToList()
                    ))
                    // Now bring it all together into a group with writers and settings
                    .Select(group => new WriterGroupModel
                    {
                        Id = group.Id,
                        MessageType = group.Header.MessageEncoding,
                        Transport = group.Header.WriterGroupTransport,
                        Publishing = new PublishingQueueSettingsModel
                        {
                            RequestedDeliveryGuarantee = group.Header.WriterGroupQualityOfService,
                            QueueName = group.Header.WriterGroupQueueName,
                            Retain = group.Header.WriterGroupMessageRetention,
                            Ttl = group.Header.WriterGroupMessageTtlTimepan
                        },
                        HeaderLayoutUri = group.Header.MessagingMode?.ToString(),
                        Name = group.Header.DataSetWriterGroup,
                        NotificationPublishThreshold = group.Header.BatchSize,
                        PublishQueuePartitions = group.Header.WriterGroupPartitions,
                        PublishingInterval = group.Header.GetNormalizedBatchTriggerInterval(),
                        DataSetWriters = group.Writers
                            .Select(w => (
                                WriterId: w.Key,
                                Header: w.Writers[0],
                                WriterBatches: w.Writers
                                    .SelectMany(w => w.OpcNodes!)
                                    .Distinct(OpcNodeModelEx.Comparer)
                                    .Batch(_maxNodesPerDataSet)
                            // Future: batch in service so it is centralized
                            ))
                            .SelectMany(b => b.WriterBatches // Do we need to materialize here?
                                .DefaultIfEmpty(kDummyEntry.YieldReturn())
                                .Select(n => n.ToList())
                                .Select((nodes, index) => new DataSetWriterModel
                                {
                                    Id = b.WriterId + "_" + index,
                                    DataSetWriterName = b.Header.DataSetWriterId,
                                    MetaDataUpdateTime = b.Header.GetNormalizedMetaDataUpdateTime(),
                                    KeyFrameCount = b.Header.DataSetKeyFrameCount,
                                    Publishing = new PublishingQueueSettingsModel
                                    {
                                        QueueName = b.Header.QueueName,
                                        RequestedDeliveryGuarantee = b.Header.QualityOfService,
                                        Retain = b.Header.MessageRetention,
                                        Ttl = b.Header.MessageTtlTimespan
                                    },
                                    MetaData = new PublishingQueueSettingsModel
                                    {
                                        QueueName = b.Header.MetaDataQueueName,
                                        RequestedDeliveryGuarantee = null,
                                        Retain = true,
                                        Ttl = null
                                    },
                                    DataSet = new PublishedDataSetModel
                                    {
                                        Name = b.Header.DataSetName,
                                        DataSetMetaData = new DataSetMetaDataModel
                                        {
                                            DataSetClassId = b.Header.DataSetClassId,
                                            Description = b.Header.DataSetDescription,
                                            Name = b.Header.DataSetName
                                        },
                                        ExtensionFields = b.Header.DataSetExtensionFields?
                                            .Select(ef => new ExtensionFieldModel
                                            {
                                                DataSetFieldName = ef.Key,
                                                Value = ef.Value
                                            })
                                            .ToList(),
                                        SendKeepAlive = b.Header.SendKeepAliveDataSetMessages,
                                        Routing = b.Header.DataSetRouting,
                                        DataSetSource = new PublishedDataSetSourceModel
                                        {
                                            Connection = b.Header.ToConnectionModel(ToCredential),
                                            SubscriptionSettings = new PublishedDataSetSettingsModel
                                            {
                                                MaxKeepAliveCount = b.Header.MaxKeepAliveCount,
                                                RepublishAfterTransfer = b.Header.RepublishAfterTransfer,
                                                MonitoredItemWatchdogTimeout = b.Header.OpcNodeWatchdogTimespan,
                                                MonitoredItemWatchdogCondition = b.Header.OpcNodeWatchdogCondition,
                                                WatchdogBehavior = b.Header.DataSetWriterWatchdogBehavior,
                                                Priority = b.Header.Priority,
                                                ResolveDisplayName = b.Header.DataSetFetchDisplayNames,
                                                DefaultHeartbeatBehavior = b.Header.DefaultHeartbeatBehavior,
                                                DefaultHeartbeatInterval = b.Header.GetNormalizedDefaultHeartbeatInterval(),
                                                DefaultSamplingInterval = b.Header.GetNormalizedDataSetSamplingInterval(),
                                                PublishingInterval = b.Header.GetNormalizedDataSetPublishingInterval(),
                                                MaxNotificationsPerPublish = null,
                                                EnableImmediatePublishing = null,
                                                EnableSequentialPublishing = null,
                                                LifeTimeCount = null,
                                                UseDeferredAcknoledgements = null
                                                // ...
                                            },
                                            PublishedVariables = ToPublishedDataItems(nodes.Where(n => n != kDummyEntry), false),
                                            PublishedEvents = ToPublishedEventItems(nodes.Where(n => n != kDummyEntry), false)
                                        }
                                    },
                                    MessageSettings = null,
                                    DataSetFieldContentMask = null
                                }))
                                .ToList(),
                        KeepAliveTime = null,
                        MaxNetworkMessageSize = null,
                        MessageSettings = null,
                        Priority = null,
                        PublishQueueSize = null,
                        SecurityGroupId = null,
                        SecurityKeyServices = null,
                        SecurityMode = null,
                        LocaleIds = null
                    })
                    .ToList(); // Convert here or else we dont print conversion correctly
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "failed to convert the published nodes.");
                return [];
            }
            finally
            {
                _logger.LogInformation("Converted published nodes entry models to jobs in {Elapsed}",
                    sw.Elapsed);
                sw.Stop();
            }

            IEnumerable<OpcNodeModel> GetNodeModels(PublishedNodesEntryModel item, int scaleTestCount)
            {
                if (item.OpcNodes != null)
                {
                    foreach (var node in item.OpcNodes)
                    {
                        if (!node.TryGetId(out var id))
                        {
                            _logger.LogError("No node id was configured in the opc node entry - skipping...");
                            continue;
                        }
                        if (scaleTestCount <= 1)
                        {
                            yield return new OpcNodeModel
                            {
                                Id = id,
                                DisplayName = node.DisplayName,
                                DataSetClassFieldId = node.DataSetClassFieldId,
                                DataSetFieldId = node.DataSetFieldId,
                                ExpandedNodeId = node.ExpandedNodeId,
                                // The publishing interval item wins over dataset over global default
                                OpcPublishingIntervalTimespan = node.GetNormalizedPublishingInterval()
                                    ?? item.GetNormalizedDataSetPublishingInterval(),
                                OpcSamplingIntervalTimespan = node.GetNormalizedSamplingInterval(),
                                HeartbeatIntervalTimespan = node.GetNormalizedHeartbeatInterval(),
                                QueueSize = node.QueueSize,
                                DiscardNew = node.DiscardNew,
                                BrowsePath = node.BrowsePath,
                                AttributeId = node.AttributeId,
                                FetchDisplayName = node.FetchDisplayName,
                                IndexRange = node.IndexRange,
                                RegisterNode = node.RegisterNode,
                                UseCyclicRead = node.UseCyclicRead,
                                CyclicReadMaxAgeTimespan = node.GetNormalizedCyclicReadMaxAge(),
                                SkipFirst = node.SkipFirst,
                                DataChangeTrigger = node.DataChangeTrigger,
                                DeadbandType = node.DeadbandType,
                                DeadbandValue = node.DeadbandValue,
                                EventFilter = node.EventFilter,
                                HeartbeatBehavior = node.HeartbeatBehavior,
                                ConditionHandling = node.ConditionHandling,
                                TriggeredNodes = node.TriggeredNodes,
                                QualityOfService = node.QualityOfService,
                                Topic = node.Topic,
                                ModelChangeHandling = node.ModelChangeHandling
                            };
                        }
                        else
                        {
                            for (var i = 0; i < scaleTestCount; i++)
                            {
                                yield return new OpcNodeModel
                                {
                                    Id = id,
                                    DisplayName = !string.IsNullOrEmpty(node.DisplayName) ?
                                        $"{node.DisplayName}_{i}" : null,
                                    DataSetFieldId = node.DataSetFieldId,
                                    DataSetClassFieldId = node.DataSetClassFieldId,
                                    ExpandedNodeId = node.ExpandedNodeId,
                                    HeartbeatIntervalTimespan = node.GetNormalizedHeartbeatInterval(),
                                    // The publishing interval item wins over dataset over global default
                                    OpcPublishingIntervalTimespan = node.GetNormalizedPublishingInterval()
                                        ?? item.GetNormalizedDataSetPublishingInterval(),
                                    OpcSamplingIntervalTimespan = node.GetNormalizedSamplingInterval(),
                                    QueueSize = node.QueueSize,
                                    SkipFirst = node.SkipFirst,
                                    DataChangeTrigger = node.DataChangeTrigger,
                                    BrowsePath = node.BrowsePath,
                                    AttributeId = node.AttributeId,
                                    FetchDisplayName = node.FetchDisplayName,
                                    IndexRange = node.IndexRange,
                                    RegisterNode = node.RegisterNode,
                                    UseCyclicRead = node.UseCyclicRead,
                                    CyclicReadMaxAgeTimespan = node.GetNormalizedCyclicReadMaxAge(),
                                    DeadbandType = node.DeadbandType,
                                    DeadbandValue = node.DeadbandValue,
                                    DiscardNew = node.DiscardNew,
                                    HeartbeatBehavior = node.HeartbeatBehavior,
                                    EventFilter = node.EventFilter,
                                    TriggeredNodes = null,
                                    ConditionHandling = node.ConditionHandling,
                                    QualityOfService = node.QualityOfService,
                                    Topic = node.Topic,
                                    ModelChangeHandling = node.ModelChangeHandling
                                };
                            }
                        }
                    }
                }

                if (!string.IsNullOrWhiteSpace(item.NodeId?.Identifier))
                {
                    yield return new OpcNodeModel
                    {
                        Id = item.NodeId.Identifier,
                        OpcPublishingIntervalTimespan = item.GetNormalizedDataSetPublishingInterval()
                    };
                }
            }

            static PublishedDataItemsModel ToPublishedDataItems(IEnumerable<OpcNodeModel> opcNodes, bool skipTriggering)
            {
                return new PublishedDataItemsModel
                {
                    PublishedData = opcNodes.Where(node => node.EventFilter == null && node.ModelChangeHandling == null)
                    .Select(node => new PublishedDataSetVariableModel
                    {
                        Id = node.DataSetFieldId,
                        PublishedVariableNodeId = node.Id,
                        DataSetClassFieldId = node.DataSetClassFieldId,

                        // At this point in time the next values are ensured to be filled in with
                        // the appropriate value: configured or default
                        PublishedVariableDisplayName = node.DisplayName,
                        SamplingIntervalHint = node.OpcSamplingIntervalTimespan,
                        HeartbeatInterval = node.HeartbeatIntervalTimespan,
                        HeartbeatBehavior = node.HeartbeatBehavior,
                        ServerQueueSize = node.QueueSize,
                        DiscardNew = node.DiscardNew,
                        SamplingUsingCyclicRead = node.UseCyclicRead,
                        CyclicReadMaxAge = node.CyclicReadMaxAgeTimespan,
                        Attribute = node.AttributeId,
                        IndexRange = node.IndexRange,
                        RegisterNodeForSampling = node.RegisterNode,
                        BrowsePath = node.BrowsePath,
                        ReadDisplayNameFromNode = node.FetchDisplayName,
                        MonitoringMode = null,
                        SubstituteValue = null,
                        SkipFirst = node.SkipFirst,
                        DataChangeTrigger = node.DataChangeTrigger,
                        DeadbandValue = node.DeadbandValue,
                        DeadbandType = node.DeadbandType,
                        Publishing = node.Topic == null && node.QualityOfService == null
                            ? null : new PublishingQueueSettingsModel
                            {
                                QueueName = node.Topic,
                                RequestedDeliveryGuarantee = node.QualityOfService,
                                Retain = null,
                                Ttl = null
                            },
                        Triggering = skipTriggering || node.TriggeredNodes == null
                            ? null : new PublishedDataSetTriggerModel
                            {
                                PublishedVariables = ToPublishedDataItems(node.TriggeredNodes, true),
                                PublishedEvents = ToPublishedEventItems(node.TriggeredNodes, true)
                            }
                    })
                    .ToList()
                };
            }

            static PublishedEventItemsModel ToPublishedEventItems(IEnumerable<OpcNodeModel> opcNodes, bool skipTriggering)
            {
                return new PublishedEventItemsModel
                {
                    PublishedData = opcNodes.Where(node => node.EventFilter != null || node.ModelChangeHandling != null)
                    .Select(node => new PublishedDataSetEventModel
                    {
                        Id = node.DataSetFieldId,
                        EventNotifier = node.Id,
                        QueueSize = node.QueueSize,
                        DiscardNew = node.DiscardNew,
                        PublishedEventName = node.DisplayName,
                        ReadEventNameFromNode = node.FetchDisplayName,
                        BrowsePath = node.BrowsePath,
                        MonitoringMode = null,
                        TypeDefinitionId = node.EventFilter?.TypeDefinitionId,
                        SelectedFields = node.EventFilter?.SelectClauses?.Select(s => s.Clone()).ToList(),
                        Filter = node.EventFilter?.WhereClause.Clone(),
                        ConditionHandling = node.ConditionHandling.Clone(),
                        ModelChangeHandling = node.ModelChangeHandling.Clone(),
                        Publishing = node.Topic == null && node.QualityOfService == null
                            ? null : new PublishingQueueSettingsModel
                            {
                                QueueName = node.Topic,
                                RequestedDeliveryGuarantee = node.QualityOfService,
                                Retain = null,
                                Ttl = null
                            },
                        Triggering = skipTriggering || node.TriggeredNodes == null
                            ? null : new PublishedDataSetTriggerModel
                            {
                                PublishedVariables = ToPublishedDataItems(node.TriggeredNodes, true),
                                PublishedEvents = ToPublishedEventItems(node.TriggeredNodes, true)
                            }
                    }).ToList()
                };
            }
        }