foreign/csharp/Iggy_SDK_Tests/Utils/Messages/MessageFactory.cs (243 lines of code) (raw):

using Iggy_SDK; using Iggy_SDK.Contracts.Http; using Iggy_SDK.Enums; using Iggy_SDK.Extensions; using Iggy_SDK.Headers; using Iggy_SDK.Kinds; using Iggy_SDK.Messages; using Iggy_SDK_Tests.Utils.DummyObj; using System.Buffers.Binary; using System.Text; using System.Text.Json; using Partitioning = Iggy_SDK.Enums.Partitioning; namespace Iggy_SDK_Tests.Utils.Messages; internal static class MessageFactory { internal static (ulong offset, ulong timestamp, Guid guid, int headersLength, uint checkSum, byte[] payload) CreateMessageResponseFields() { ulong offset = (ulong)Random.Shared.Next(6, 69); var timestamp = (ulong)Random.Shared.Next(420, 69420); var guid = Guid.NewGuid(); var checkSum = (uint)Random.Shared.Next(42069, 69042); var bytes = Encoding.UTF8.GetBytes(Utility.RandomString(Random.Shared.Next(6, 69))); int headersLength = Random.Shared.Next(1, 69); return (offset, timestamp, guid, headersLength, checkSum, bytes); } internal static Func<DummyMessage, byte[]> Serializer = msg => { Span<byte> bytes = stackalloc byte[4 + 4 + msg.Text.Length]; BinaryPrimitives.WriteInt32LittleEndian(bytes[..4], msg.Id); BinaryPrimitives.WriteInt32LittleEndian(bytes[4..8], msg.Text.Length); Encoding.UTF8.GetBytes(msg.Text).CopyTo(bytes[8..]); return bytes.ToArray(); }; internal static (ulong offset, ulong timestamp, Guid guid, int headersLength, uint checkSum, byte[] payload) CreateMessageResponseFieldsTMessage() { var msg = new DummyMessage { Id = Random.Shared.Next(1, 69), Text = "Hello" }; ulong offset = (ulong)Random.Shared.Next(6, 69); int headersLength = Random.Shared.Next(1, 69); var timestamp = (ulong)Random.Shared.Next(420, 69420); var checkSum = (uint)Random.Shared.Next(42069, 69420); var guid = Guid.NewGuid(); var bytes = Serializer(msg); return (offset, timestamp, guid, headersLength, checkSum, bytes); } internal static (ulong offset, ulong timestamp, Guid guid, byte[] payload) CreateMessageResponseGenerics() { ulong offset = (ulong)Random.Shared.Next(6, 69); var timestamp = (ulong)Random.Shared.Next(420, 69420); var guid = Guid.NewGuid(); var bytes = Encoding.UTF8.GetBytes("Hello"); return (offset, timestamp, guid, bytes); } internal static (ulong offset, ulong timestamp, Guid guid, byte[] payload) CreateMessageResponseFields<TMessage>(TMessage message, Func<TMessage, byte[]> serializer) { ulong offset = (ulong)Random.Shared.Next(6, 69); var timestamp = (ulong)Random.Shared.Next(420, 69420); var guid = Guid.NewGuid(); var bytes = serializer(message); return (offset, timestamp, guid, bytes); } internal static MessageSendRequest CreateMessageSendRequest() { var valBytes = new byte[4]; var streamId = Identifier.Numeric(1); var topicId = Identifier.Numeric(1); BinaryPrimitives.WriteInt32LittleEndian(valBytes, Random.Shared.Next(1, 69)); return new MessageSendRequest { StreamId = streamId, TopicId = topicId, Partitioning = new Iggy_SDK.Kinds.Partitioning { Kind = Partitioning.PartitionId, Length = 4, Value = valBytes, }, Messages = new List<Message> { new() { Id = Guid.NewGuid(), Payload = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(DummyObjFactory.CreateDummyObject())), Headers = null, }, new() { Id = Guid.NewGuid(), Payload = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(DummyObjFactory.CreateDummyObject())), Headers = null, } }, }; } internal static MessageSendRequest CreateMessageSendRequest(int streamId, int topicId, int partitionId, IList<Message>? messages = null) { return new MessageSendRequest { StreamId = Identifier.Numeric(streamId), TopicId = Identifier.Numeric(topicId), Partitioning = Iggy_SDK.Kinds.Partitioning.PartitionId(partitionId), Messages = messages ?? GenerateDummyMessages(Random.Shared.Next(1, 69), Random.Shared.Next(69, 420)) }; } private static byte[] SerializeDummyMessage(DummyMessage message) { var bytes = new byte[4 + 4 + message.Text.Length]; BinaryPrimitives.WriteInt32LittleEndian(bytes.AsSpan()[..4], message.Id); BinaryPrimitives.WriteInt32LittleEndian(bytes.AsSpan()[4..8], message.Text.Length); Encoding.UTF8.GetBytes(message.Text).CopyTo(bytes.AsSpan()[8..]); return bytes; } internal static IList<DummyMessage> GenerateDummyMessages(int count) { return Enumerable.Range(1, count).Select(i => new DummyMessage { Id = Random.Shared.Next(1, 69), Text = Utility.RandomString(Random.Shared.Next(69, 100)) }).ToList(); } internal static Func<byte[], DummyMessage> DeserializeDummyMessage => bytes => { var id = BinaryPrimitives.ReadInt32LittleEndian(bytes.AsSpan()[..4]); var textLength = BinaryPrimitives.ReadInt32LittleEndian(bytes.AsSpan()[4..8]); var text = Encoding.UTF8.GetString(bytes.AsSpan()[8..(8 + textLength)]); return new DummyMessage { Id = id, Text = text }; }; internal static IList<Message> GenerateMessages(int count, Dictionary<HeaderKey, HeaderValue>? Headers = null) { return Enumerable.Range(1, count).Select(i => new Message { Id = Guid.NewGuid(), Headers = Headers, Payload = SerializeDummyMessage(new DummyMessage { Id = Random.Shared.Next(1, 69), Text = Utility.RandomString(Random.Shared.Next(20, 69)) }) }).ToList(); } internal static IList<Message> GenerateDummyMessages(int count, int payloadLen, Dictionary<HeaderKey, HeaderValue>? Headers = null) { return Enumerable.Range(1, count).Select(i => new Message { Id = Guid.NewGuid(), Headers = Headers, Payload = Enumerable.Range(1, payloadLen).Select(x => (byte)x).ToArray() }).ToList(); } internal static MessageFetchRequest CreateMessageFetchRequestConsumer() { return new MessageFetchRequest { Count = Random.Shared.Next(1, 10), AutoCommit = true, Consumer = Consumer.New(1), PartitionId = Random.Shared.Next(1, 10), PollingStrategy = PollingStrategy.Offset(69420), StreamId = Identifier.Numeric(Random.Shared.Next(1, 10)), TopicId = Identifier.Numeric(Random.Shared.Next(1, 10)), }; } internal static MessageFetchRequest CreateMessageFetchRequestConsumer(int count, int streamId, int topicId, int partitionId, int consumerId = 1) { return new MessageFetchRequest { Count = count, AutoCommit = true, Consumer = Consumer.New(consumerId), PartitionId = partitionId, PollingStrategy = PollingStrategy.Next(), StreamId = Identifier.Numeric(streamId), TopicId = Identifier.Numeric(topicId), }; } internal static MessageFetchRequest CreateMessageFetchRequestConsumerGroup(int count, int streamId, int topicId, int partitionId, int consumerGroupId) { return new MessageFetchRequest { Count = count, AutoCommit = true, Consumer = Consumer.Group(consumerGroupId), PartitionId = partitionId, PollingStrategy = PollingStrategy.Next(), StreamId = Identifier.Numeric(streamId), TopicId = Identifier.Numeric(topicId), }; } internal static Dictionary<HeaderKey, HeaderValue> GenerateMessageHeaders(int count) { var headers = new Dictionary<HeaderKey, HeaderValue>(); for (int i = 0; i < count; i++) { headers.Add( HeaderKey.New(Utility.RandomString(Random.Shared.Next(50, 100))), Random.Shared.Next(1, 12) switch { 1 => HeaderValue.FromBytes(Encoding.UTF8.GetBytes(Utility.RandomString(Random.Shared.Next(50, 100)))), 2 => HeaderValue.FromString(Utility.RandomString(Random.Shared.Next(25, 100))), 3 => HeaderValue.FromBool(Random.Shared.Next(0, 1) switch { 0 => false, 1 => true, _ => false }), 4 => HeaderValue.FromInt32(Random.Shared.Next(69, 420)), 5 => HeaderValue.FromInt64(Random.Shared.NextInt64(6942023, 98723131)), 6 => HeaderValue.FromInt128(Guid.NewGuid().ToByteArray().ToInt128()), 7 => HeaderValue.FromGuid(Guid.NewGuid()), 8 => HeaderValue.FromUInt32((uint)Random.Shared.Next(1, 69)), 9 => HeaderValue.FromUInt64((ulong)Random.Shared.Next(1, 69)), 10 => HeaderValue.FromUInt128(Guid.NewGuid().ToUInt128()), 11 => HeaderValue.FromFloat(Random.Shared.NextSingle()), 12 => HeaderValue.FromDouble(Random.Shared.NextDouble()), _ => HeaderValue.FromUInt64((ulong)Random.Shared.Next(1, 69)) }); } return headers; } internal static MessageResponseHttp CreateMessageResponse() { return new MessageResponseHttp { Offset = (ulong)Random.Shared.Next(1, 10), Payload = Convert.ToBase64String("TROLOLO"u8.ToArray()), Timestamp = 12371237821L, State = MessageState.Available, Checksum = (uint)Random.Shared.Next(42069, 69420), Id = new UInt128(69, 420), Headers = null }; } } internal class MessageResponseHttp { public required ulong Offset { get; init; } public required uint Checksum { get; init; } public required ulong Timestamp { get; init; } public UInt128 Id { get; init; } public required string Payload { get; init; } public Dictionary<HeaderKey, HeaderValue>? Headers { get; init; } public required MessageState State { get; init; } } internal class DummyObject { public required int Id { get; set; } public required string Text { get; set; } }