in src/Azure.IIoT.OpcUa.Publisher/src/Services/DataSetWriter.cs [118:307]
public static IEnumerable<DataSetWriter> GetDataSetWriters(WriterGroupDataSource group,
DataSetWriterModel dataSetWriter)
{
var options = group._options.Value;
if (dataSetWriter?.DataSet?.DataSetSource == null)
{
throw new ArgumentException("DataSet source missing", nameof(dataSetWriter));
}
var dataset = dataSetWriter.DataSet;
var source = dataset.DataSetSource;
var routing = dataset.Routing ?? options.DefaultDataSetRouting
?? DataSetRoutingMode.None;
var dataSetClassId = dataset.DataSetMetaData?.DataSetClassId
?? Guid.Empty;
var escWriterName = TopicFilter.Escape(
dataSetWriter.DataSetWriterName ?? Constants.DefaultDataSetWriterName);
var escWriterGroup = TopicFilter.Escape(
group._writerGroup.Name ?? Constants.DefaultWriterGroupName);
var variables = new Dictionary<string, string>
{
[PublisherConfig.DataSetWriterIdVariableName] = dataSetWriter.Id,
[PublisherConfig.DataSetWriterVariableName] = escWriterName,
[PublisherConfig.DataSetWriterNameVariableName] = escWriterName,
[PublisherConfig.DataSetClassIdVariableName] = dataSetClassId.ToString(),
[PublisherConfig.WriterGroupIdVariableName] = group.Id,
[PublisherConfig.DataSetWriterGroupVariableName] = escWriterGroup,
[PublisherConfig.WriterGroupVariableName] = escWriterGroup
// ...
};
// No auto routing - group variables and events by publish settings
var data = source.PublishedVariables?.PublishedData?
.GroupBy(d => Resolve(options, group._writerGroup, dataSetWriter,
d.Publishing, d.Id, routing, variables));
if (data != null)
{
if (routing == DataSetRoutingMode.None)
{
foreach (var items in data)
{
var id = dataSetWriter.Id;
yield return CreateDataSetWriter(id, items.Key, items.ToList());
}
}
else
{
foreach (var (p, item) in data.SelectMany(d => d.Select(i => (d.Key, i))))
{
var id = $"{dataSetWriter.Id}_{item.Id
?? item.GetHashCode().ToString(CultureInfo.InvariantCulture)}";
yield return CreateDataSetWriter(id, p, new[] { item });
}
}
}
var evts = source.PublishedEvents?.PublishedData?
.GroupBy(d => Resolve(options, group._writerGroup, dataSetWriter,
d.Publishing, d.Id, routing, variables));
if (evts != null)
{
if (routing == DataSetRoutingMode.None)
{
foreach (var items in evts)
{
var id = dataSetWriter.Id;
yield return CreateEventWriter(id, items.Key, items.ToList());
}
}
else
{
foreach (var (p, item) in evts.SelectMany(d => d.Select(i => (d.Key, i))))
{
var id = $"{dataSetWriter.Id}_{item.Id ?? item.GetHashCode().ToString(CultureInfo.InvariantCulture)}";
yield return CreateEventWriter(id, p, new[] { item });
}
}
}
DataSetWriter CreateDataSetWriter(string id,
(PublishingQueueSettingsModel?, PublishingQueueSettingsModel?) publishSettings,
IReadOnlyList<PublishedDataSetVariableModel> data)
{
return new DataSetWriter(group, routing, dataSetWriter with
{
Id = id,
MetaData = publishSettings.Item1,
Publishing = publishSettings.Item2,
DataSet = dataset with
{
DataSetMetaData = dataset.DataSetMetaData.Clone(),
DataSetSource = source with
{
Connection = source.Connection.Clone(),
SubscriptionSettings = source.SubscriptionSettings.Clone(),
PublishedEvents = null,
PublishedVariables = new PublishedDataItemsModel
{
PublishedData = data
}
}
}
});
}
DataSetWriter CreateEventWriter(string id,
(PublishingQueueSettingsModel?, PublishingQueueSettingsModel?) publishSettings,
IReadOnlyList<PublishedDataSetEventModel> data)
{
return new DataSetWriter(group, routing, dataSetWriter with
{
Id = id,
MetaData = publishSettings.Item1,
Publishing = publishSettings.Item2,
DataSet = dataset with
{
DataSetMetaData = dataset.DataSetMetaData.Clone(),
DataSetSource = source with
{
Connection = source.Connection.Clone(),
SubscriptionSettings = source.SubscriptionSettings.Clone(),
PublishedEvents = new PublishedEventItemsModel
{
PublishedData = data
},
PublishedVariables = null
}
}
});
}
// Resolve the publish queue settings with the data set writer provided settings.
static (PublishingQueueSettingsModel?, PublishingQueueSettingsModel?) Resolve(
PublisherOptions options, WriterGroupModel group, DataSetWriterModel dataSetWriter,
PublishingQueueSettingsModel? settings, string? fieldId,
DataSetRoutingMode routing, Dictionary<string, string> variables)
{
var builder = new TopicBuilder(options, group.MessageType,
new TopicTemplatesOptions
{
Telemetry = settings?.QueueName
?? dataSetWriter.Publishing?.QueueName
?? group.Publishing?.QueueName,
DataSetMetaData = dataSetWriter.MetaData?.QueueName
},
variables
.Append(KeyValuePair
.Create(PublisherConfig.DataSetFieldIdVariableName,
TopicFilter.Escape(fieldId ?? string.Empty))));
var telemetryTopic = builder.TelemetryTopic;
var metadataTopic = builder.DataSetMetaDataTopic;
if (string.IsNullOrWhiteSpace(metadataTopic) || routing != DataSetRoutingMode.None)
{
metadataTopic = telemetryTopic;
}
var publishing = new PublishingQueueSettingsModel
{
QueueName = telemetryTopic,
Ttl = settings?.Ttl
?? dataSetWriter.Publishing?.Ttl
?? group.Publishing?.Ttl,
RequestedDeliveryGuarantee = settings?.RequestedDeliveryGuarantee
?? dataSetWriter.Publishing?.RequestedDeliveryGuarantee
?? group.Publishing?.RequestedDeliveryGuarantee,
Retain = settings?.Retain
?? dataSetWriter.Publishing?.Retain
?? group.Publishing?.Retain
};
var metadata = new PublishingQueueSettingsModel
{
QueueName = metadataTopic,
Ttl =
dataSetWriter.MetaData?.Ttl
?? publishing.Ttl,
RequestedDeliveryGuarantee =
dataSetWriter.MetaData?.RequestedDeliveryGuarantee
?? publishing.RequestedDeliveryGuarantee,
Retain =
dataSetWriter.MetaData?.Retain
?? publishing.Retain
};
return (metadata, publishing);
}
}