private record struct EncodedMessage()

in src/Azure.IIoT.OpcUa.Publisher/src/Services/NetworkMessageEncoder.cs [206:562]


        private record struct EncodedMessage(int NotificationsPerMessage,
            PubSubMessage NetworkMessage, PublishingQueueSettingsModel Queue,
            Action OnSentCallback, IEventSchema? Schema,
            IServiceMessageContext? EncodingContext = null);

        /// <summary>
        /// Produce network messages from the data set message model
        /// </summary>
        /// <param name="messages"></param>
        /// <param name="isBatched"></param>
        /// <returns></returns>
        private List<EncodedMessage> GetNetworkMessages(
            IEnumerable<OpcUaSubscriptionNotification> messages, bool isBatched)
        {
            var standardsCompliant = _options.Value.UseStandardsCompliantEncoding ?? false;
            var result = new List<EncodedMessage>();

            static PublishingQueueSettingsModel GetQueue(DataSetWriterContext context,
                PublisherOptions options)
            {
                return new PublishingQueueSettingsModel
                {
                    RequestedDeliveryGuarantee = context.Qos,
                    Retain = context.Retain,
                    Ttl = context.Ttl,
                    QueueName = context.Topic
                };
            }
            // Group messages by topic and qos, then writer group and then by dataset class id
            foreach (var topics in messages
                .Select(m => (Notification: m, Context: (m.Context as DataSetWriterContext)!))
                .Where(m => m.Context != null)
                .GroupBy(m => GetQueue(m.Context, _options.Value)))
            {
                var queue = topics.Key;
                foreach (var publishers in topics.GroupBy(m => m.Context.PublisherId))
                {
                    var publisherId = publishers.Key;
                    foreach (var groups in publishers
                        .GroupBy(m => (m.Context.WriterGroup, m.Context.Schema)))
                    {
                        var writerGroup = groups.Key.WriterGroup;
                        var schema = groups.Key.Schema;

                        if (writerGroup?.MessageSettings == null)
                        {
                            // Must have a writer group
                            Drop(groups.Select(m => m.Notification));
                            continue;
                        }
                        var encoding = writerGroup.MessageType ?? MessageEncoding.Json;
                        var networkMessageContentMask =
                            writerGroup.MessageSettings.NetworkMessageContentMask;
                        var hasSamplesPayload =
                            (networkMessageContentMask & NetworkMessageContentFlags.MonitoredItemMessage) != 0;
                        if (hasSamplesPayload && !isBatched)
                        {
                            networkMessageContentMask |= NetworkMessageContentFlags.SingleDataSetMessage;
                        }
                        var namespaceFormat =
                            writerGroup.MessageSettings?.NamespaceFormat ??
                            _options.Value.DefaultNamespaceFormat ??
                            NamespaceFormat.Uri;
                        foreach (var dataSetClass in groups
                            .GroupBy(m => m.Context.Writer?.DataSet?.DataSetMetaData?.DataSetClassId ?? Guid.Empty))
                        {
                            var dataSetClassId = dataSetClass.Key;
                            BaseNetworkMessage? currentMessage = null;
                            var currentNotifications = new List<OpcUaSubscriptionNotification>();
                            foreach (var (Notification, Context) in dataSetClass)
                            {
                                if (Context.Writer == null ||
                                    (hasSamplesPayload && !encoding.HasFlag(MessageEncoding.Json)))
                                {
                                    // Must have a writer or if samples mode, must be json
                                    Drop(Notification.YieldReturn());
                                    continue;
                                }

                                var dataSetMessageContentMask =
                                    Context.Writer.MessageSettings?.DataSetMessageContentMask;
                                var dataSetFieldContentMask =
                                    Context.Writer.DataSetFieldContentMask;

                                if (Notification.MessageType != MessageType.Metadata)
                                {
                                    Debug.Assert(Notification.Notifications != null);
                                    if (Notification.MessageType == MessageType.KeepAlive)
                                    {
                                        Debug.Assert(Notification.Notifications.Count == 0);
                                        if (hasSamplesPayload)
                                        {
                                            Drop(Notification.YieldReturn());
                                            continue;
                                        }

                                        // Create regular data set messages
                                        if (!PubSubMessage.TryCreateDataSetMessage(encoding,
                                            GetDataSetWriterName(Notification, Context),
                                            Context.DataSetWriterId, dataSetMessageContentMask,
                                            MessageType.KeepAlive,
#if KA_WITH_EX_FIELDS
                                            new DataSet(Context.ExtensionFields, dataSetFieldContentMask),
#else
                                            new DataSet(),
#endif
                                            GetTimestamp(Notification), Context.NextWriterSequenceNumber(),
                                            standardsCompliant, Notification.EndpointUrl,
                                            Notification.ApplicationUri, Context.MetaData?.MetaData,
                                            out var dataSetMessage))
                                        {
                                            Drop(Notification.YieldReturn());
                                            continue;
                                        }

                                        AddMessage(dataSetMessage);
                                        LogNotification(Notification, false);
                                        currentNotifications.Add(Notification);
                                        continue;
                                    }

                                    var notificationQueues = Notification.Notifications
                                        .GroupBy(m => m.DataSetFieldName)
                                        .Select(c => new Queue<MonitoredItemNotificationModel>(
                                            c
                                            .OrderBy(m => m.Value?.SourceTimestamp)
                                            .ToArray()))
                                        .ToArray();
                                    var notComplete = notificationQueues.Any(q => q.Count > 0);
                                    if (!notComplete)
                                    {
                                        // Already completed so we cannot complete it as an encoded message.
                                        Drop(Notification.YieldReturn());
                                        continue;
                                    }
                                    while (notComplete)
                                    {
                                        var orderedNotifications = notificationQueues
                                            .Select(q => q.Count > 0 ? q.Dequeue() : null!)
                                            .Where(s => s?.DataSetFieldName != null)
                                            .ToList()
                                            ;
                                        notComplete = notificationQueues.Any(q => q.Count > 0);

                                        if (!hasSamplesPayload)
                                        {
                                            // Create regular data set messages
                                            if (!PubSubMessage.TryCreateDataSetMessage(encoding,
                                                GetDataSetWriterName(Notification, Context), Context.DataSetWriterId,
                                                dataSetMessageContentMask, Notification.MessageType,
                                                new DataSet(orderedNotifications
                                                    .Select(s => (s.DataSetFieldName!, s.Value))
                                                    .Concat(Context.ExtensionFields)
                                                    .ToList(), dataSetFieldContentMask),
                                                GetTimestamp(Notification), Context.NextWriterSequenceNumber(),
                                                standardsCompliant, Notification.EndpointUrl, Notification.ApplicationUri,
                                                Context.MetaData?.MetaData, out var dataSetMessage))
                                            {
                                                Drop(Notification.YieldReturn());
                                                continue;
                                            }

                                            AddMessage(dataSetMessage);
                                            LogNotification(Notification, false);
                                        }
                                        else
                                        {
                                            // Add monitored item message payload to network message to handle backcompat
                                            foreach (var itemNotifications in orderedNotifications
                                                .GroupBy(f => f.Id + f.MessageId))
                                            {
                                                var notificationsInGroup = itemNotifications.ToList();
                                                Debug.Assert(notificationsInGroup.Count != 0);
                                                //
                                                // Special monitored item handling for events and conditions. Collate all
                                                // values into a single key value data dictionary extension object value.
                                                // Regular notifications we send as single messages.
                                                //
                                                if (notificationsInGroup.Count > 1)
                                                {
                                                    if (Notification.MessageType == MessageType.Event ||
                                                        Notification.MessageType == MessageType.Condition)
                                                    {
                                                        Debug.Assert(notificationsInGroup
                                                            .Select(n => n.DataSetFieldName).Distinct().Count() == notificationsInGroup.Count,
                                                            "There should not be duplicates in fields in a group.");
                                                        Debug.Assert(notificationsInGroup
                                                            .All(n => n.SequenceNumber == notificationsInGroup[0].SequenceNumber),
                                                            "All notifications in the group should have the same sequence number.");

                                                        var eventNotification = notificationsInGroup[0] with
                                                        {
                                                            Value = new DataValue
                                                            {
                                                                Value = new EncodeableDictionary(notificationsInGroup
                                                                    .Select(n => new KeyDataValuePair(n.DataSetFieldName!, n.Value)))
                                                            },
                                                            DataSetFieldName = notificationsInGroup[0].DataSetName
                                                        };
                                                        notificationsInGroup =
                                                        [
                                                            eventNotification
                                                        ];
                                                    }
                                                    else if (_options.Value.RemoveDuplicatesFromBatch ?? false)
                                                    {
                                                        var pruned = notificationsInGroup
                                                            .OrderByDescending(k => k.Value?.SourceTimestamp) // Descend from latest
                                                            .DistinctBy(k => k.DataSetFieldName) // Only leave the latest values
                                                            .ToList();
                                                        if (pruned.Count != notificationsInGroup.Count)
                                                        {
                                                            if (_logNotifications)
                                                            {
                                                                _logger.LogInformation("Removed {Count} duplicates from batch.",
                                                                    notificationsInGroup.Count - pruned.Count);
                                                            }
                                                            notificationsInGroup = pruned;
                                                        }
                                                    }
                                                }
                                                foreach (var notification in notificationsInGroup)
                                                {
                                                    if (notification.DataSetFieldName != null)
                                                    {
                                                        if (!PubSubMessage.TryCreateMonitoredItemMessage(encoding,
                                                            writerGroup.Name, dataSetMessageContentMask, Notification.MessageType,
                                                            GetTimestamp(Notification), Context.NextWriterSequenceNumber(),
                                                            new DataSet(notification.DataSetFieldName, notification.Value,
                                                                dataSetFieldContentMask),
                                                            notification.NodeId, Notification.EndpointUrl, Notification.ApplicationUri,
                                                            standardsCompliant, Context.Writer.DataSet?.ExtensionFields,
                                                            out var dataSetMessage))
                                                        {
                                                            LogNotification(notification, true);
                                                            continue;
                                                        }
                                                        AddMessage(dataSetMessage);
                                                        LogNotification(notification, false);
                                                    }
                                                }
                                            }
                                        }
                                    }
                                    currentNotifications.Add(Notification);

                                    //
                                    // Add message and number of notifications processed count to method result.
                                    // Checks current length and splits if max items reached if configured.
                                    //
                                    void AddMessage(BaseDataSetMessage dataSetMessage)
                                    {
                                        if (currentMessage == null)
                                        {
                                            if (!PubSubMessage.TryCreateNetworkMessage(encoding, publisherId,
                                                writerGroup.Name ?? Constants.DefaultWriterGroupName, networkMessageContentMask,
                                                dataSetClassId, () => SequenceNumber.Increment16(ref _sequenceNumber),
                                                GetTimestamp(Notification) ?? _timeProvider.GetUtcNow(), namespaceFormat,
                                                standardsCompliant, isBatched, schema, out var message))
                                            {
                                                Drop(messages);
                                                return;
                                            }
                                            currentMessage = message;
                                        }
                                        currentMessage.Messages.Add(dataSetMessage);

                                        var maxMessagesToPublish = writerGroup.MessageSettings?.MaxDataSetMessagesPerPublish ??
                                            _options.Value.DefaultMaxDataSetMessagesPerPublish;
                                        if (maxMessagesToPublish != null && currentMessage.Messages.Count >= maxMessagesToPublish)
                                        {
                                            result.Add(new EncodedMessage(currentNotifications.Count, currentMessage,
                                                queue, () => currentNotifications.ForEach(n => n.Dispose()),
                                                schema, Notification.ServiceMessageContext));
#if DEBUG
                                            currentNotifications.ForEach(n => n.MarkProcessed());
#endif
                                            currentMessage = null;
                                            currentNotifications = [];
                                        }
                                    }
                                }
                                else if (Context.MetaData?.MetaData != null && !hasSamplesPayload)
                                {
                                    if (currentMessage?.Messages.Count > 0)
                                    {
                                        // Start a new message but first emit current
                                        result.Add(new EncodedMessage(currentNotifications.Count, currentMessage,
                                            queue, () => currentNotifications.ForEach(n => n.Dispose()),
                                            schema, Notification.ServiceMessageContext));
#if DEBUG
                                        currentNotifications.ForEach(n => n.MarkProcessed());
#endif
                                        currentMessage = null;
                                        currentNotifications = [];
                                    }

                                    if (PubSubMessage.TryCreateMetaDataMessage(encoding, publisherId,
                                        writerGroup.Name ?? Constants.DefaultWriterGroupName,
                                        GetDataSetWriterName(Notification, Context), Context.DataSetWriterId,
                                        Context.MetaData.MetaData, namespaceFormat, standardsCompliant,
                                        out var metadataMessage))
                                    {
                                        result.Add(new EncodedMessage(0, metadataMessage, queue, Notification.Dispose,
                                                schema, Notification.ServiceMessageContext));
                                    }
#if DEBUG
                                    Notification.MarkProcessed();
#endif
                                    LogNotification(Notification, false);
                                }
                            }
                            if (currentMessage?.Messages.Count > 0)
                            {
                                result.Add(new EncodedMessage(currentNotifications.Count, currentMessage, queue,
                                    () => currentNotifications.ForEach(n => n.Dispose()),
                                    schema, currentNotifications.LastOrDefault()?.ServiceMessageContext));
#if DEBUG
                                currentNotifications.ForEach(n => n.MarkProcessed());
#endif
                            }
                            else
                            {
                                Debug.Assert(currentNotifications.Count == 0);
                            }

                            static string GetDataSetWriterName(OpcUaSubscriptionNotification Notification,
                                DataSetWriterContext Context)
                            {
                                var dataSetWriterName = Context.WriterName;
                                var eventTypeName = Notification.EventTypeName;
                                if (!string.IsNullOrWhiteSpace(eventTypeName))
                                {
                                    return dataSetWriterName + "|" + eventTypeName;
                                }
                                return dataSetWriterName;
                            }

                            DateTimeOffset? GetTimestamp(OpcUaSubscriptionNotification Notification)
                            {
                                switch (_options.Value.MessageTimestamp)
                                {
                                    case MessageTimestamp.EncodingTimeUtc:
                                        return _timeProvider.GetUtcNow();
                                    case MessageTimestamp.PublishTime:
                                        return Notification.PublishTimestamp;
                                    default:
                                        return Notification.CreatedTimestamp;
                                }
                            }
                        }
                    }
                }
            }

            return result;
        }