in src/Azure.IIoT.OpcUa.Publisher/src/Services/NetworkMessageEncoder.cs [206:562]
private record struct EncodedMessage(int NotificationsPerMessage,
PubSubMessage NetworkMessage, PublishingQueueSettingsModel Queue,
Action OnSentCallback, IEventSchema? Schema,
IServiceMessageContext? EncodingContext = null);
/// <summary>
/// Produce network messages from the data set message model
/// </summary>
/// <param name="messages"></param>
/// <param name="isBatched"></param>
/// <returns></returns>
private List<EncodedMessage> GetNetworkMessages(
IEnumerable<OpcUaSubscriptionNotification> messages, bool isBatched)
{
var standardsCompliant = _options.Value.UseStandardsCompliantEncoding ?? false;
var result = new List<EncodedMessage>();
static PublishingQueueSettingsModel GetQueue(DataSetWriterContext context,
PublisherOptions options)
{
return new PublishingQueueSettingsModel
{
RequestedDeliveryGuarantee = context.Qos,
Retain = context.Retain,
Ttl = context.Ttl,
QueueName = context.Topic
};
}
// Group messages by topic and qos, then writer group and then by dataset class id
foreach (var topics in messages
.Select(m => (Notification: m, Context: (m.Context as DataSetWriterContext)!))
.Where(m => m.Context != null)
.GroupBy(m => GetQueue(m.Context, _options.Value)))
{
var queue = topics.Key;
foreach (var publishers in topics.GroupBy(m => m.Context.PublisherId))
{
var publisherId = publishers.Key;
foreach (var groups in publishers
.GroupBy(m => (m.Context.WriterGroup, m.Context.Schema)))
{
var writerGroup = groups.Key.WriterGroup;
var schema = groups.Key.Schema;
if (writerGroup?.MessageSettings == null)
{
// Must have a writer group
Drop(groups.Select(m => m.Notification));
continue;
}
var encoding = writerGroup.MessageType ?? MessageEncoding.Json;
var networkMessageContentMask =
writerGroup.MessageSettings.NetworkMessageContentMask;
var hasSamplesPayload =
(networkMessageContentMask & NetworkMessageContentFlags.MonitoredItemMessage) != 0;
if (hasSamplesPayload && !isBatched)
{
networkMessageContentMask |= NetworkMessageContentFlags.SingleDataSetMessage;
}
var namespaceFormat =
writerGroup.MessageSettings?.NamespaceFormat ??
_options.Value.DefaultNamespaceFormat ??
NamespaceFormat.Uri;
foreach (var dataSetClass in groups
.GroupBy(m => m.Context.Writer?.DataSet?.DataSetMetaData?.DataSetClassId ?? Guid.Empty))
{
var dataSetClassId = dataSetClass.Key;
BaseNetworkMessage? currentMessage = null;
var currentNotifications = new List<OpcUaSubscriptionNotification>();
foreach (var (Notification, Context) in dataSetClass)
{
if (Context.Writer == null ||
(hasSamplesPayload && !encoding.HasFlag(MessageEncoding.Json)))
{
// Must have a writer or if samples mode, must be json
Drop(Notification.YieldReturn());
continue;
}
var dataSetMessageContentMask =
Context.Writer.MessageSettings?.DataSetMessageContentMask;
var dataSetFieldContentMask =
Context.Writer.DataSetFieldContentMask;
if (Notification.MessageType != MessageType.Metadata)
{
Debug.Assert(Notification.Notifications != null);
if (Notification.MessageType == MessageType.KeepAlive)
{
Debug.Assert(Notification.Notifications.Count == 0);
if (hasSamplesPayload)
{
Drop(Notification.YieldReturn());
continue;
}
// Create regular data set messages
if (!PubSubMessage.TryCreateDataSetMessage(encoding,
GetDataSetWriterName(Notification, Context),
Context.DataSetWriterId, dataSetMessageContentMask,
MessageType.KeepAlive,
#if KA_WITH_EX_FIELDS
new DataSet(Context.ExtensionFields, dataSetFieldContentMask),
#else
new DataSet(),
#endif
GetTimestamp(Notification), Context.NextWriterSequenceNumber(),
standardsCompliant, Notification.EndpointUrl,
Notification.ApplicationUri, Context.MetaData?.MetaData,
out var dataSetMessage))
{
Drop(Notification.YieldReturn());
continue;
}
AddMessage(dataSetMessage);
LogNotification(Notification, false);
currentNotifications.Add(Notification);
continue;
}
var notificationQueues = Notification.Notifications
.GroupBy(m => m.DataSetFieldName)
.Select(c => new Queue<MonitoredItemNotificationModel>(
c
.OrderBy(m => m.Value?.SourceTimestamp)
.ToArray()))
.ToArray();
var notComplete = notificationQueues.Any(q => q.Count > 0);
if (!notComplete)
{
// Already completed so we cannot complete it as an encoded message.
Drop(Notification.YieldReturn());
continue;
}
while (notComplete)
{
var orderedNotifications = notificationQueues
.Select(q => q.Count > 0 ? q.Dequeue() : null!)
.Where(s => s?.DataSetFieldName != null)
.ToList()
;
notComplete = notificationQueues.Any(q => q.Count > 0);
if (!hasSamplesPayload)
{
// Create regular data set messages
if (!PubSubMessage.TryCreateDataSetMessage(encoding,
GetDataSetWriterName(Notification, Context), Context.DataSetWriterId,
dataSetMessageContentMask, Notification.MessageType,
new DataSet(orderedNotifications
.Select(s => (s.DataSetFieldName!, s.Value))
.Concat(Context.ExtensionFields)
.ToList(), dataSetFieldContentMask),
GetTimestamp(Notification), Context.NextWriterSequenceNumber(),
standardsCompliant, Notification.EndpointUrl, Notification.ApplicationUri,
Context.MetaData?.MetaData, out var dataSetMessage))
{
Drop(Notification.YieldReturn());
continue;
}
AddMessage(dataSetMessage);
LogNotification(Notification, false);
}
else
{
// Add monitored item message payload to network message to handle backcompat
foreach (var itemNotifications in orderedNotifications
.GroupBy(f => f.Id + f.MessageId))
{
var notificationsInGroup = itemNotifications.ToList();
Debug.Assert(notificationsInGroup.Count != 0);
//
// Special monitored item handling for events and conditions. Collate all
// values into a single key value data dictionary extension object value.
// Regular notifications we send as single messages.
//
if (notificationsInGroup.Count > 1)
{
if (Notification.MessageType == MessageType.Event ||
Notification.MessageType == MessageType.Condition)
{
Debug.Assert(notificationsInGroup
.Select(n => n.DataSetFieldName).Distinct().Count() == notificationsInGroup.Count,
"There should not be duplicates in fields in a group.");
Debug.Assert(notificationsInGroup
.All(n => n.SequenceNumber == notificationsInGroup[0].SequenceNumber),
"All notifications in the group should have the same sequence number.");
var eventNotification = notificationsInGroup[0] with
{
Value = new DataValue
{
Value = new EncodeableDictionary(notificationsInGroup
.Select(n => new KeyDataValuePair(n.DataSetFieldName!, n.Value)))
},
DataSetFieldName = notificationsInGroup[0].DataSetName
};
notificationsInGroup =
[
eventNotification
];
}
else if (_options.Value.RemoveDuplicatesFromBatch ?? false)
{
var pruned = notificationsInGroup
.OrderByDescending(k => k.Value?.SourceTimestamp) // Descend from latest
.DistinctBy(k => k.DataSetFieldName) // Only leave the latest values
.ToList();
if (pruned.Count != notificationsInGroup.Count)
{
if (_logNotifications)
{
_logger.LogInformation("Removed {Count} duplicates from batch.",
notificationsInGroup.Count - pruned.Count);
}
notificationsInGroup = pruned;
}
}
}
foreach (var notification in notificationsInGroup)
{
if (notification.DataSetFieldName != null)
{
if (!PubSubMessage.TryCreateMonitoredItemMessage(encoding,
writerGroup.Name, dataSetMessageContentMask, Notification.MessageType,
GetTimestamp(Notification), Context.NextWriterSequenceNumber(),
new DataSet(notification.DataSetFieldName, notification.Value,
dataSetFieldContentMask),
notification.NodeId, Notification.EndpointUrl, Notification.ApplicationUri,
standardsCompliant, Context.Writer.DataSet?.ExtensionFields,
out var dataSetMessage))
{
LogNotification(notification, true);
continue;
}
AddMessage(dataSetMessage);
LogNotification(notification, false);
}
}
}
}
}
currentNotifications.Add(Notification);
//
// Add message and number of notifications processed count to method result.
// Checks current length and splits if max items reached if configured.
//
void AddMessage(BaseDataSetMessage dataSetMessage)
{
if (currentMessage == null)
{
if (!PubSubMessage.TryCreateNetworkMessage(encoding, publisherId,
writerGroup.Name ?? Constants.DefaultWriterGroupName, networkMessageContentMask,
dataSetClassId, () => SequenceNumber.Increment16(ref _sequenceNumber),
GetTimestamp(Notification) ?? _timeProvider.GetUtcNow(), namespaceFormat,
standardsCompliant, isBatched, schema, out var message))
{
Drop(messages);
return;
}
currentMessage = message;
}
currentMessage.Messages.Add(dataSetMessage);
var maxMessagesToPublish = writerGroup.MessageSettings?.MaxDataSetMessagesPerPublish ??
_options.Value.DefaultMaxDataSetMessagesPerPublish;
if (maxMessagesToPublish != null && currentMessage.Messages.Count >= maxMessagesToPublish)
{
result.Add(new EncodedMessage(currentNotifications.Count, currentMessage,
queue, () => currentNotifications.ForEach(n => n.Dispose()),
schema, Notification.ServiceMessageContext));
#if DEBUG
currentNotifications.ForEach(n => n.MarkProcessed());
#endif
currentMessage = null;
currentNotifications = [];
}
}
}
else if (Context.MetaData?.MetaData != null && !hasSamplesPayload)
{
if (currentMessage?.Messages.Count > 0)
{
// Start a new message but first emit current
result.Add(new EncodedMessage(currentNotifications.Count, currentMessage,
queue, () => currentNotifications.ForEach(n => n.Dispose()),
schema, Notification.ServiceMessageContext));
#if DEBUG
currentNotifications.ForEach(n => n.MarkProcessed());
#endif
currentMessage = null;
currentNotifications = [];
}
if (PubSubMessage.TryCreateMetaDataMessage(encoding, publisherId,
writerGroup.Name ?? Constants.DefaultWriterGroupName,
GetDataSetWriterName(Notification, Context), Context.DataSetWriterId,
Context.MetaData.MetaData, namespaceFormat, standardsCompliant,
out var metadataMessage))
{
result.Add(new EncodedMessage(0, metadataMessage, queue, Notification.Dispose,
schema, Notification.ServiceMessageContext));
}
#if DEBUG
Notification.MarkProcessed();
#endif
LogNotification(Notification, false);
}
}
if (currentMessage?.Messages.Count > 0)
{
result.Add(new EncodedMessage(currentNotifications.Count, currentMessage, queue,
() => currentNotifications.ForEach(n => n.Dispose()),
schema, currentNotifications.LastOrDefault()?.ServiceMessageContext));
#if DEBUG
currentNotifications.ForEach(n => n.MarkProcessed());
#endif
}
else
{
Debug.Assert(currentNotifications.Count == 0);
}
static string GetDataSetWriterName(OpcUaSubscriptionNotification Notification,
DataSetWriterContext Context)
{
var dataSetWriterName = Context.WriterName;
var eventTypeName = Notification.EventTypeName;
if (!string.IsNullOrWhiteSpace(eventTypeName))
{
return dataSetWriterName + "|" + eventTypeName;
}
return dataSetWriterName;
}
DateTimeOffset? GetTimestamp(OpcUaSubscriptionNotification Notification)
{
switch (_options.Value.MessageTimestamp)
{
case MessageTimestamp.EncodingTimeUtc:
return _timeProvider.GetUtcNow();
case MessageTimestamp.PublishTime:
return Notification.PublishTimestamp;
default:
return Notification.CreatedTimestamp;
}
}
}
}
}
}
return result;
}