public static IEnumerable GetDataSetWriters()

in src/Azure.IIoT.OpcUa.Publisher/src/Services/DataSetWriter.cs [118:307]


            public static IEnumerable<DataSetWriter> GetDataSetWriters(WriterGroupDataSource group,
                DataSetWriterModel dataSetWriter)
            {
                var options = group._options.Value;
                if (dataSetWriter?.DataSet?.DataSetSource == null)
                {
                    throw new ArgumentException("DataSet source missing", nameof(dataSetWriter));
                }

                var dataset = dataSetWriter.DataSet;
                var source = dataset.DataSetSource;
                var routing = dataset.Routing ?? options.DefaultDataSetRouting
                    ?? DataSetRoutingMode.None;

                var dataSetClassId = dataset.DataSetMetaData?.DataSetClassId
                    ?? Guid.Empty;
                var escWriterName = TopicFilter.Escape(
                    dataSetWriter.DataSetWriterName ?? Constants.DefaultDataSetWriterName);
                var escWriterGroup = TopicFilter.Escape(
                    group._writerGroup.Name ?? Constants.DefaultWriterGroupName);

                var variables = new Dictionary<string, string>
                {
                    [PublisherConfig.DataSetWriterIdVariableName] = dataSetWriter.Id,
                    [PublisherConfig.DataSetWriterVariableName] = escWriterName,
                    [PublisherConfig.DataSetWriterNameVariableName] = escWriterName,
                    [PublisherConfig.DataSetClassIdVariableName] = dataSetClassId.ToString(),
                    [PublisherConfig.WriterGroupIdVariableName] = group.Id,
                    [PublisherConfig.DataSetWriterGroupVariableName] = escWriterGroup,
                    [PublisherConfig.WriterGroupVariableName] = escWriterGroup
                    // ...
                };

                // No auto routing - group variables and events by publish settings
                var data = source.PublishedVariables?.PublishedData?
                    .GroupBy(d => Resolve(options, group._writerGroup, dataSetWriter,
                            d.Publishing, d.Id, routing, variables));
                if (data != null)
                {
                    if (routing == DataSetRoutingMode.None)
                    {
                        foreach (var items in data)
                        {
                            var id = dataSetWriter.Id;
                            yield return CreateDataSetWriter(id, items.Key, items.ToList());
                        }
                    }
                    else
                    {
                        foreach (var (p, item) in data.SelectMany(d => d.Select(i => (d.Key, i))))
                        {
                            var id = $"{dataSetWriter.Id}_{item.Id
                                ?? item.GetHashCode().ToString(CultureInfo.InvariantCulture)}";
                            yield return CreateDataSetWriter(id, p, new[] { item });
                        }
                    }
                }
                var evts = source.PublishedEvents?.PublishedData?
                    .GroupBy(d => Resolve(options, group._writerGroup, dataSetWriter,
                            d.Publishing, d.Id, routing, variables));
                if (evts != null)
                {
                    if (routing == DataSetRoutingMode.None)
                    {
                        foreach (var items in evts)
                        {
                            var id = dataSetWriter.Id;
                            yield return CreateEventWriter(id, items.Key, items.ToList());
                        }
                    }
                    else
                    {
                        foreach (var (p, item) in evts.SelectMany(d => d.Select(i => (d.Key, i))))
                        {
                            var id = $"{dataSetWriter.Id}_{item.Id ?? item.GetHashCode().ToString(CultureInfo.InvariantCulture)}";
                            yield return CreateEventWriter(id, p, new[] { item });
                        }
                    }
                }

                DataSetWriter CreateDataSetWriter(string id,
                    (PublishingQueueSettingsModel?, PublishingQueueSettingsModel?) publishSettings,
                    IReadOnlyList<PublishedDataSetVariableModel> data)
                {
                    return new DataSetWriter(group, routing, dataSetWriter with
                    {
                        Id = id,
                        MetaData = publishSettings.Item1,
                        Publishing = publishSettings.Item2,
                        DataSet = dataset with
                        {
                            DataSetMetaData = dataset.DataSetMetaData.Clone(),
                            DataSetSource = source with
                            {
                                Connection = source.Connection.Clone(),
                                SubscriptionSettings = source.SubscriptionSettings.Clone(),

                                PublishedEvents = null,
                                PublishedVariables = new PublishedDataItemsModel
                                {
                                    PublishedData = data
                                }
                            }
                        }
                    });
                }

                DataSetWriter CreateEventWriter(string id,
                    (PublishingQueueSettingsModel?, PublishingQueueSettingsModel?) publishSettings,
                    IReadOnlyList<PublishedDataSetEventModel> data)
                {
                    return new DataSetWriter(group, routing, dataSetWriter with
                    {
                        Id = id,
                        MetaData = publishSettings.Item1,
                        Publishing = publishSettings.Item2,
                        DataSet = dataset with
                        {
                            DataSetMetaData = dataset.DataSetMetaData.Clone(),
                            DataSetSource = source with
                            {
                                Connection = source.Connection.Clone(),
                                SubscriptionSettings = source.SubscriptionSettings.Clone(),

                                PublishedEvents = new PublishedEventItemsModel
                                {
                                    PublishedData = data
                                },
                                PublishedVariables = null
                            }
                        }
                    });
                }

                // Resolve the publish queue settings with the data set writer provided settings.
                static (PublishingQueueSettingsModel?, PublishingQueueSettingsModel?) Resolve(
                    PublisherOptions options, WriterGroupModel group, DataSetWriterModel dataSetWriter,
                    PublishingQueueSettingsModel? settings, string? fieldId,
                    DataSetRoutingMode routing, Dictionary<string, string> variables)
                {
                    var builder = new TopicBuilder(options, group.MessageType,
                        new TopicTemplatesOptions
                        {
                            Telemetry = settings?.QueueName
                                ?? dataSetWriter.Publishing?.QueueName
                                ?? group.Publishing?.QueueName,
                            DataSetMetaData = dataSetWriter.MetaData?.QueueName
                        },
                        variables
                            .Append(KeyValuePair
                                .Create(PublisherConfig.DataSetFieldIdVariableName,
                                    TopicFilter.Escape(fieldId ?? string.Empty))));

                    var telemetryTopic = builder.TelemetryTopic;
                    var metadataTopic = builder.DataSetMetaDataTopic;
                    if (string.IsNullOrWhiteSpace(metadataTopic) || routing != DataSetRoutingMode.None)
                    {
                        metadataTopic = telemetryTopic;
                    }

                    var publishing = new PublishingQueueSettingsModel
                    {
                        QueueName = telemetryTopic,
                        Ttl = settings?.Ttl
                            ?? dataSetWriter.Publishing?.Ttl
                            ?? group.Publishing?.Ttl,
                        RequestedDeliveryGuarantee = settings?.RequestedDeliveryGuarantee
                            ?? dataSetWriter.Publishing?.RequestedDeliveryGuarantee
                            ?? group.Publishing?.RequestedDeliveryGuarantee,
                        Retain = settings?.Retain
                            ?? dataSetWriter.Publishing?.Retain
                            ?? group.Publishing?.Retain
                    };

                    var metadata = new PublishingQueueSettingsModel
                    {
                        QueueName = metadataTopic,
                        Ttl =
                               dataSetWriter.MetaData?.Ttl
                            ?? publishing.Ttl,
                        RequestedDeliveryGuarantee =
                               dataSetWriter.MetaData?.RequestedDeliveryGuarantee
                            ?? publishing.RequestedDeliveryGuarantee,
                        Retain =
                               dataSetWriter.MetaData?.Retain
                            ?? publishing.Retain
                    };
                    return (metadata, publishing);
                }
            }