in src/DotPulsar/Internal/BatchHandler.cs [42:72]
public IMessage<TMessage> Add(MessageIdData messageId, uint redeliveryCount, MessageMetadata metadata, ReadOnlySequence<byte> data)
{
var messages = new List<IMessage<TMessage>>(metadata.NumMessagesInBatch);
long index = 0;
for (var i = 0; i < metadata.NumMessagesInBatch; ++i)
{
var singleMetadataSize = data.ReadUInt32(index, true);
index += 4;
var singleMetadata = Serializer.Deserialize<SingleMessageMetadata>(data.Slice(index, singleMetadataSize));
index += singleMetadataSize;
var singleMessageId = new MessageId(messageId.LedgerId, messageId.EntryId, messageId.Partition, i);
var message = _messageFactory.Create(singleMessageId, redeliveryCount, data.Slice(index, singleMetadata.PayloadSize), metadata, singleMetadata);
messages.Add(message);
index += (uint) singleMetadata.PayloadSize;
}
lock (_lock)
{
if (_trackBatches)
_batches.AddLast(new Batch(messageId, metadata.NumMessagesInBatch));
foreach (var message in messages)
{
_messages.Enqueue(message);
}
return _messages.Dequeue();
}
}