foreign/csharp/Iggy_SDK/Contracts/Tcp/TcpContracts.cs (646 lines of code) (raw):
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Contracts.Http.Auth;
using Iggy_SDK.Enums;
using Iggy_SDK.Extensions;
using Iggy_SDK.Headers;
using Iggy_SDK.Messages;
using System.Buffers.Binary;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json;
using Partitioning = Iggy_SDK.Enums.Partitioning;
namespace Iggy_SDK.Contracts.Tcp;
//TODO - write unit tests for all the user related contracts
internal static class TcpContracts
{
internal static byte[] LoginWithPersonalAccessToken(LoginWithPersonalAccessToken request)
{
Span<byte> bytes = stackalloc byte[5 + request.Token.Length];
bytes[0] = (byte)request.Token.Length;
Encoding.UTF8.GetBytes(request.Token, bytes[1..(1 + request.Token.Length)]);
return bytes.ToArray();
}
internal static byte[] DeletePersonalRequestToken(DeletePersonalAccessTokenRequest request)
{
Span<byte> bytes = stackalloc byte[5 + request.Name.Length];
bytes[0] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[1..(1 + request.Name.Length)]);
return bytes.ToArray();
}
internal static byte[] CreatePersonalAccessToken(CreatePersonalAccessTokenRequest request)
{
Span<byte> bytes = stackalloc byte[5 + request.Name.Length];
bytes[0] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[1..(1 + request.Name.Length)]);
BinaryPrimitives.WriteUInt32LittleEndian(bytes[(1 + request.Name.Length)..], request.Expiry ?? 0);
return bytes.ToArray();
}
internal static byte[] GetClient(uint clientId)
{
var bytes = new byte[4];
BinaryPrimitives.WriteUInt32LittleEndian(bytes, clientId);
return bytes;
}
internal static byte[] GetUser(Identifier userId)
{
Span<byte> bytes = stackalloc byte[userId.Length + 2];
bytes.WriteBytesFromIdentifier(userId);
return bytes.ToArray();
}
internal static byte[] DeleteUser(Identifier userId)
{
Span<byte> bytes = stackalloc byte[userId.Length + 2];
bytes.WriteBytesFromIdentifier(userId);
return bytes.ToArray();
}
internal static byte[] LoginUser(LoginUserRequest request)
{
List<byte> bytes = new List<byte>();
// Username
byte usernameLength = (byte)request.Username.Length;
bytes.Add(usernameLength);
bytes.AddRange(Encoding.UTF8.GetBytes(request.Username));
// Password
byte passwordLength = (byte)request.Password.Length;
bytes.Add(passwordLength);
bytes.AddRange(Encoding.UTF8.GetBytes(request.Password));
// Version (opcional)
if (!string.IsNullOrEmpty(request.Version))
{
byte[] versionBytes = Encoding.UTF8.GetBytes(request.Version);
bytes.AddRange(BitConverter.GetBytes(versionBytes.Length)); // tamanho da versão (u32, little-endian)
bytes.AddRange(versionBytes);
}
else
{
bytes.AddRange(BitConverter.GetBytes(0)); // tamanho 0 para versão ausente
}
// Context (opcional)
if (!string.IsNullOrEmpty(request.Context))
{
byte[] contextBytes = Encoding.UTF8.GetBytes(request.Context);
bytes.AddRange(BitConverter.GetBytes(contextBytes.Length)); // tamanho do contexto (u32, little-endian)
bytes.AddRange(contextBytes);
}
else
{
bytes.AddRange(BitConverter.GetBytes(0)); // tamanho 0 para contexto ausente
}
return bytes.ToArray();
// var length = request.Username.Length + request.Password.Length + 2;
// Span<byte> bytes = stackalloc byte[length];
//
// int position = 0;
// bytes[position] = (byte)request.Username.Length;
// position += 1;
// Encoding.UTF8.GetBytes(request.Username, bytes[position..(position + request.Username.Length)]);
// position += request.Username.Length;
// bytes[position] = (byte)request.Password.Length;
// position += 1;
// Encoding.UTF8.GetBytes(request.Password, bytes[position..(position + request.Password.Length)]);
// position += request.Password.Length;
// return bytes.ToArray();
//return JsonSerializer.SerializeToUtf8Bytes(request);
}
internal static byte[] ChangePassword(ChangePasswordRequest request)
{
var length = request.UserId.Length + 2 + request.CurrentPassword.Length + request.NewPassword.Length + 2;
Span<byte> bytes = stackalloc byte[length];
bytes.WriteBytesFromIdentifier(request.UserId);
int position = request.UserId.Length + 2;
bytes[position] = (byte)request.CurrentPassword.Length;
position += 1;
Encoding.UTF8.GetBytes(request.CurrentPassword, bytes[position..(position + request.CurrentPassword.Length)]);
position += request.CurrentPassword.Length;
bytes[position] = (byte)request.NewPassword.Length;
position += 1;
Encoding.UTF8.GetBytes(request.NewPassword, bytes[position..(position + request.NewPassword.Length)]);
position += request.NewPassword.Length;
return bytes.ToArray();
}
internal static byte[] UpdatePermissions(UpdateUserPermissionsRequest request)
{
var length = request.UserId.Length + 2 +
(request.Permissions is not null ? 1 + 4 + CalculatePermissionsSize(request.Permissions) : 0);
Span<byte> bytes = stackalloc byte[length];
bytes.WriteBytesFromIdentifier(request.UserId);
int position = request.UserId.Length + 2;
if (request.Permissions is not null)
{
bytes[position++] = 1;
var permissions = GetBytesFromPermissions(request.Permissions);
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)],
CalculatePermissionsSize(request.Permissions));
position += 4;
permissions.CopyTo(bytes[position..(position + permissions.Length)]);
position += permissions.Length;
}
else
{
bytes[position++] = 0;
}
return bytes.ToArray();
}
internal static byte[] UpdateUser(UpdateUserRequest request)
{
var length = request.UserId.Length + 2 + (request.Username?.Length ?? 0)
+ (request.UserStatus is not null ? 2 : 1) + 1 + 1;
Span<byte> bytes = stackalloc byte[length];
bytes.WriteBytesFromIdentifier(request.UserId);
int position = request.UserId.Length + 2;
if (request.Username is not null)
{
bytes[position] = 1;
position += 1;
bytes[position] = (byte)request.Username.Length;
position += 1;
Encoding.UTF8.GetBytes(request.Username,
bytes[(position)..(position + request.Username.Length)]);
position += request.Username.Length;
}
else
{
bytes[request.UserId.Length] = 0;
position += 1;
}
if (request.UserStatus is not null)
{
bytes[position++] = 1;
bytes[position++] = request.UserStatus switch
{
UserStatus.Active => 1,
UserStatus.Inactive => 2,
_ => throw new ArgumentOutOfRangeException()
};
}
else
{
bytes[position++] = 0;
}
return bytes.ToArray();
}
internal static byte[] CreateUser(CreateUserRequest request)
{
int capacity = 3 + request.Username.Length + request.Password.Length
+ (request.Permissions is not null ? 1 + 4 + CalculatePermissionsSize(request.Permissions) : 0);
Span<byte> bytes = stackalloc byte[capacity];
int position = 0;
bytes[position++] = (byte)request.Username.Length;
position += Encoding.UTF8.GetBytes(request.Username, bytes[position..(position + request.Username.Length)]);
bytes[position++] = (byte)request.Password.Length;
position += Encoding.UTF8.GetBytes(request.Password, bytes[position..(position + request.Password.Length)]);
bytes[position++] = request.Status switch
{
UserStatus.Active => (byte)1,
UserStatus.Inactive => (byte)2,
_ => throw new ArgumentOutOfRangeException()
};
if (request.Permissions is not null)
{
bytes[position++] = 1;
var permissions = GetBytesFromPermissions(request.Permissions);
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], CalculatePermissionsSize(request.Permissions));
position += 4;
permissions.CopyTo(bytes[position..(position + permissions.Length)]);
position += permissions.Length;
}
else
{
bytes[position++] = 0;
}
return bytes.ToArray();
}
private static byte[] GetBytesFromPermissions(Permissions data)
{
int size = CalculatePermissionsSize(data);
Span<byte> bytes = stackalloc byte[size];
bytes[0] = data.Global.ManageServers ? (byte)1 : (byte)0;
bytes[1] = data.Global.ReadServers ? (byte)1 : (byte)0;
bytes[2] = data.Global.ManageUsers ? (byte)1 : (byte)0;
bytes[3] = data.Global.ReadUsers ? (byte)1 : (byte)0;
bytes[4] = data.Global.ManageStreams ? (byte)1 : (byte)0;
bytes[5] = data.Global.ReadStreams ? (byte)1 : (byte)0;
bytes[6] = data.Global.ManageTopics ? (byte)1 : (byte)0;
bytes[7] = data.Global.ReadTopics ? (byte)1 : (byte)0;
bytes[8] = data.Global.PollMessages ? (byte)1 : (byte)0;
bytes[9] = data.Global.SendMessages ? (byte)1 : (byte)0;
if (data.Streams is not null)
{
int streamsCount = data.Streams.Count;
int currentStream = 1;
bytes[10] = (byte)1;
int position = 11;
foreach (var (streamId, stream) in data.Streams)
{
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], streamId);
position += 4;
bytes[position] = stream.ManageStream ? (byte)1 : (byte)0;
bytes[position + 1] = stream.ReadStream ? (byte)1 : (byte)0;
bytes[position + 2] = stream.ManageTopics ? (byte)1 : (byte)0;
bytes[position + 3] = stream.ReadTopics ? (byte)1 : (byte)0;
bytes[position + 4] = stream.PollMessages ? (byte)1 : (byte)0;
bytes[position + 5] = stream.SendMessages ? (byte)1 : (byte)0;
position += 6;
if (stream.Topics != null)
{
int topicsCount = stream.Topics.Count;
int currentTopic = 1;
bytes[position] = (byte)1;
position += 1;
foreach (var (topicId, topic) in stream.Topics)
{
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], topicId);
position += 4;
bytes[position] = topic.ManageTopic ? (byte)1 : (byte)0;
bytes[position + 1] = topic.ReadTopic ? (byte)1 : (byte)0;
bytes[position + 2] = topic.PollMessages ? (byte)1 : (byte)0;
bytes[position + 3] = topic.SendMessages ? (byte)1 : (byte)0;
position += 4;
if (currentTopic < topicsCount)
{
currentTopic++;
bytes[position++] = (byte)1;
}
else
{
bytes[position++] = (byte)0;
}
}
}
else
{
bytes[position++] = (byte)0;
}
if (currentStream < streamsCount)
{
currentStream++;
bytes[position++] = (byte)1;
}
else
{
bytes[position++] = (byte)0;
}
}
}
else
{
bytes[0] = (byte)0;
}
return bytes.ToArray();
}
private static int CalculatePermissionsSize(Permissions data)
{
int size = 10;
if (data.Streams is not null)
{
size += 1;
foreach (var (_, stream) in data.Streams)
{
size += 4;
size += 6;
size += 1;
if (stream.Topics is not null)
{
size += 1;
size += stream.Topics.Count * 9;
}
else
{
size += 1;
}
}
}
else
{
size += 1;
}
return size;
}
internal static void GetMessages(Span<byte> bytes, MessageFetchRequest request)
{
bytes[0] = GetConsumerTypeByte(request.Consumer.Type);
bytes.WriteBytesFromIdentifier(request.Consumer.Id, 1);
var position = 1 + request.Consumer.Id.Length + 2;
bytes.WriteBytesFromStreamAndTopicIdentifiers(request.StreamId, request.TopicId, position);
position += 2 + request.StreamId.Length + 2 + request.TopicId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.PartitionId);
bytes[position + 4] = GetPollingStrategyByte(request.PollingStrategy.Kind);
BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 5)..(position + 13)], request.PollingStrategy.Value);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 13)..(position + 17)], request.Count);
bytes[position + 17] = request.AutoCommit ? (byte)1 : (byte)0;
}
internal static void CreateMessage(Span<byte> bytes, Identifier streamId, Identifier topicId,
Kinds.Partitioning partitioning, IList<Message> messages)
{
bytes.WriteBytesFromStreamAndTopicIdentifiers(streamId, topicId);
int streamTopicIdPosition = 2 + streamId.Length + 2 + topicId.Length;
bytes[streamTopicIdPosition] = GetPartitioningKindByte(partitioning.Kind);
bytes[streamTopicIdPosition + 1] = (byte)partitioning.Length;
partitioning.Value.CopyTo(bytes[(streamTopicIdPosition + 2)..(streamTopicIdPosition + partitioning.Length + 2)]);
var position = 2 + partitioning.Length + streamTopicIdPosition;
bytes = messages switch
{
Message[] messagesArray => HandleMessagesArray(position, messagesArray, bytes),
List<Message> messagesList => HandleMessagesList(position, messagesList, bytes),
_ => HandleMessagesIList(position, messages, bytes),
};
}
private static Span<byte> HandleMessagesIList(int position, IList<Message> messages, Span<byte> bytes)
{
Span<byte> emptyHeaders = stackalloc byte[4];
foreach (var message in messages)
{
var idSlice = bytes[position..(position + 16)];
//TODO - this required testing on different cpu architectures
Unsafe.WriteUnaligned(ref MemoryMarshal.GetReference(idSlice), message.Id);
if (message.Headers is not null)
{
var headersBytes = GetHeadersBytes(message.Headers);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 16)..(position + 20)], headersBytes.Length);
headersBytes.CopyTo(bytes[(position + 20)..(position + 20 + headersBytes.Length)]);
position += headersBytes.Length + 20;
}
else
{
emptyHeaders.CopyTo(bytes[(position + 16)..(position + 16 + emptyHeaders.Length)]);
position += 20;
}
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position)..(position + 4)], message.Payload.Length);
var payloadBytes = message.Payload;
var slice = bytes[(position + 4)..];
payloadBytes.CopyTo(slice);
position += payloadBytes.Length + 4;
}
return bytes;
}
private static Span<byte> HandleMessagesArray(int position, Message[] messages, Span<byte> bytes)
{
Span<byte> emptyHeaders = stackalloc byte[4];
ref var start = ref MemoryMarshal.GetArrayDataReference(messages);
ref var end = ref Unsafe.Add(ref start, messages.Length);
while (Unsafe.IsAddressLessThan(ref start, ref end))
{
var idSlice = bytes[position..(position + 16)];
//TODO - this required testing on different cpu architectures
Unsafe.WriteUnaligned(ref MemoryMarshal.GetReference(idSlice), start.Id);
if (start.Headers is not null)
{
var headersBytes = GetHeadersBytes(start.Headers);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 16)..(position + 20)], headersBytes.Length);
headersBytes.CopyTo(bytes[(position + 20)..(position + 20 + headersBytes.Length)]);
position += headersBytes.Length + 20;
}
else
{
emptyHeaders.CopyTo(bytes[(position + 16)..(position + 16 + emptyHeaders.Length)]);
position += 20;
}
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position)..(position + 4)], start.Payload.Length);
var payloadBytes = start.Payload;
var slice = bytes[(position + 4)..];
payloadBytes.CopyTo(slice);
position += payloadBytes.Length + 4;
start = ref Unsafe.Add(ref start, 1);
}
return bytes;
}
private static Span<byte> HandleMessagesList(int position, List<Message> messages, Span<byte> bytes)
{
Span<byte> emptyHeaders = stackalloc byte[4];
Span<Message> listAsSpan = CollectionsMarshal.AsSpan(messages);
ref var start = ref MemoryMarshal.GetReference(listAsSpan);
ref var end = ref Unsafe.Add(ref start, listAsSpan.Length);
while (Unsafe.IsAddressLessThan(ref start, ref end))
{
var idSlice = bytes[position..(position + 16)];
//TODO - this required testing on different cpu architectures
Unsafe.WriteUnaligned(ref MemoryMarshal.GetReference(idSlice), start.Id);
if (start.Headers is not null)
{
var headersBytes = GetHeadersBytes(start.Headers);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 16)..(position + 20)], headersBytes.Length);
headersBytes.CopyTo(bytes[(position + 20)..(position + 20 + headersBytes.Length)]);
position += headersBytes.Length + 20;
}
else
{
emptyHeaders.CopyTo(bytes[(position + 16)..(position + 16 + emptyHeaders.Length)]);
position += 20;
}
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position)..(position + 4)], start.Payload.Length);
var payloadBytes = start.Payload;
var slice = bytes[(position + 4)..];
payloadBytes.CopyTo(slice);
position += payloadBytes.Length + 4;
start = ref Unsafe.Add(ref start, 1);
}
return bytes;
}
private static byte[] GetHeadersBytes(Dictionary<HeaderKey, HeaderValue> headers)
{
var headersLength = headers.Sum(header => 4 + header.Key.Value.Length + 1 + 4 + header.Value.Value.Length);
Span<byte> headersBytes = stackalloc byte[headersLength];
int position = 0;
foreach (var (headerKey, headerValue) in headers)
{
var headerBytes = GetBytesFromHeader(headerKey, headerValue);
headerBytes.CopyTo(headersBytes[position..(position + headerBytes.Length)]);
position += headerBytes.Length;
}
return headersBytes.ToArray();
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static byte HeaderKindToByte(HeaderKind kind)
{
return kind switch
{
HeaderKind.Raw => 1,
HeaderKind.String => 2,
HeaderKind.Bool => 3,
HeaderKind.Int32 => 6,
HeaderKind.Int64 => 7,
HeaderKind.Int128 => 8,
HeaderKind.Uint32 => 11,
HeaderKind.Uint64 => 12,
HeaderKind.Uint128 => 13,
HeaderKind.Float => 14,
HeaderKind.Double => 15,
_ => throw new ArgumentOutOfRangeException(nameof(kind), kind, null)
};
}
private static byte[] GetBytesFromHeader(HeaderKey headerKey, HeaderValue headerValue)
{
var headerBytesLength = 4 + headerKey.Value.Length + 1 + 4 + headerValue.Value.Length;
Span<byte> headerBytes = stackalloc byte[headerBytesLength];
BinaryPrimitives.WriteInt32LittleEndian(headerBytes[..4], headerKey.Value.Length);
var headerKeyBytes = Encoding.UTF8.GetBytes(headerKey.Value);
headerKeyBytes.CopyTo(headerBytes[4..(4 + headerKey.Value.Length)]);
headerBytes[4 + headerKey.Value.Length] = HeaderKindToByte(headerValue.Kind);
BinaryPrimitives.WriteInt32LittleEndian(
headerBytes[(4 + headerKey.Value.Length + 1)..(4 + headerKey.Value.Length + 1 + 4)],
headerValue.Value.Length);
headerValue.Value.CopyTo(headerBytes[(4 + headerKey.Value.Length + 1 + 4)..]);
return headerBytes.ToArray();
}
internal static byte[] CreateStream(StreamRequest request)
{
Span<byte> bytes = stackalloc byte[4 + request.Name.Length + 1];
BinaryPrimitives.WriteInt32LittleEndian(bytes[..4], request.StreamId ?? 0);
bytes[4] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[5..]);
return bytes.ToArray();
}
internal static byte[] UpdateStream(Identifier streamId, UpdateStreamRequest request)
{
Span<byte> bytes = stackalloc byte[streamId.Length + request.Name.Length + 3];
bytes.WriteBytesFromIdentifier(streamId);
int position = 2 + streamId.Length;
bytes[position] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[(position + 1)..]);
return bytes.ToArray();
}
internal static byte[] CreateGroup(CreateConsumerGroupRequest request)
{
Span<byte> bytes = stackalloc byte[2 + request.StreamId.Length + 2 + request.TopicId.Length + 4 + 1 + request.Name.Length];
bytes.WriteBytesFromStreamAndTopicIdentifiers(request.StreamId , request.TopicId);
int position = 2 + request.StreamId.Length + 2 + request.TopicId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.ConsumerGroupId);
position += 4;
bytes[position] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[(position + 1)..]);
return bytes.ToArray();
}
internal static byte[] JoinGroup(JoinConsumerGroupRequest request)
{
Span<byte> bytes = stackalloc byte[2 + request.StreamId.Length + 2 + request.TopicId.Length + request.ConsumerGroupId.Length + 2];
bytes.WriteBytesFromStreamAndTopicIdentifiers(request.StreamId , request.TopicId);
int position = 2 + request.StreamId.Length + 2 + request.TopicId.Length;
bytes.WriteBytesFromIdentifier(request.ConsumerGroupId, position);
return bytes.ToArray();
}
internal static byte[] LeaveGroup(LeaveConsumerGroupRequest request)
{
Span<byte> bytes = stackalloc byte[2 + request.StreamId.Length + 2 + request.TopicId.Length + request.ConsumerGroupId.Length + 2];
bytes.WriteBytesFromStreamAndTopicIdentifiers(request.StreamId , request.TopicId);
int position = 2 + request.StreamId.Length + 2 + request.TopicId.Length;
bytes.WriteBytesFromIdentifier(request.ConsumerGroupId, position);
return bytes.ToArray();
}
internal static byte[] DeleteGroup(Identifier streamId, Identifier topicId, Identifier groupId)
{
Span<byte> bytes = stackalloc byte[2 + streamId.Length + 2 + topicId.Length + groupId.Length + 2];
bytes.WriteBytesFromStreamAndTopicIdentifiers(streamId , topicId);
int position = 2 + streamId.Length + 2 + topicId.Length;
bytes.WriteBytesFromIdentifier(groupId, position);
return bytes.ToArray();
}
internal static byte[] GetGroups(Identifier streamId, Identifier topicId)
{
Span<byte> bytes = stackalloc byte[2 + streamId.Length + 2 + topicId.Length];
bytes.WriteBytesFromStreamAndTopicIdentifiers(streamId , topicId);
return bytes.ToArray();
}
internal static byte[] GetGroup(Identifier streamId, Identifier topicId, Identifier groupId)
{
Span<byte> bytes = stackalloc byte[2 + streamId.Length + 2 + topicId.Length + groupId.Length + 2];
bytes.WriteBytesFromStreamAndTopicIdentifiers(streamId , topicId);
int position = 2 + streamId.Length + 2 + topicId.Length;
bytes.WriteBytesFromIdentifier(groupId, position);
return bytes.ToArray();
}
internal static byte[] UpdateTopic(Identifier streamId, Identifier topicId, UpdateTopicRequest request)
{
Span<byte> bytes = stackalloc byte[streamId.Length + topicId.Length + 18 + request.Name.Length];
bytes.WriteBytesFromStreamAndTopicIdentifiers(streamId , topicId);
var position = 4 + streamId.Length + topicId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)],
request.MessageExpiry);
BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 4)..(position + 12)],
request.MaxTopicSize);
bytes[position + 12] = request.ReplicationFactor;
bytes[position + 13] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[(position + 14)..]);
return bytes.ToArray();
}
internal static byte[] CreateTopic(Identifier streamId, TopicRequest request)
{
Span<byte> bytes = stackalloc byte[2 + streamId.Length + 22 + request.Name.Length];
bytes.WriteBytesFromIdentifier(streamId);
var streamIdBytesLength = 2 + streamId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[streamIdBytesLength..(streamIdBytesLength + 4)], request.TopicId ?? 0);
int position = 4 + streamIdBytesLength;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.PartitionsCount);
BinaryPrimitives.WriteInt32LittleEndian(bytes[(position + 4)..(position + 8)], request.MessageExpiry);
BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 8)..(position + 16)], request.MaxTopicSize);
bytes[position + 16] = request.ReplicationFactor;
bytes[position + 17] = (byte)request.Name.Length;
Encoding.UTF8.GetBytes(request.Name, bytes[(position + 18)..]);
return bytes.ToArray();
}
internal static byte[] GetTopicById(Identifier streamId, Identifier topicId)
{
Span<byte> bytes = stackalloc byte[2 + streamId.Length + 2 + topicId.Length];
bytes.WriteBytesFromStreamAndTopicIdentifiers(streamId , topicId);
return bytes.ToArray();
}
internal static byte[] DeleteTopic(Identifier streamId, Identifier topicId)
{
Span<byte> bytes = stackalloc byte[2 + streamId.Length + 2 + topicId.Length];
bytes.WriteBytesFromStreamAndTopicIdentifiers(streamId , topicId);
return bytes.ToArray();
}
internal static byte[] UpdateOffset(StoreOffsetRequest request)
{
Span<byte> bytes =
stackalloc byte[2 + request.StreamId.Length + 2 + request.TopicId.Length + 15 + request.Consumer.Id.Length];
bytes[0] = GetConsumerTypeByte(request.Consumer.Type);
bytes.WriteBytesFromIdentifier(request.Consumer.Id, 1);
var position = 1 + request.Consumer.Id.Length + 2;
bytes.WriteBytesFromStreamAndTopicIdentifiers(request.StreamId , request.TopicId, position);
position += 2 + request.StreamId.Length + 2 + request.TopicId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.PartitionId);
BinaryPrimitives.WriteUInt64LittleEndian(bytes[(position + 4)..(position + 12)], request.Offset);
return bytes.ToArray();
}
internal static byte[] GetOffset(OffsetRequest request)
{
Span<byte> bytes =
stackalloc byte[2 + request.StreamId.Length + 2 + request.TopicId.Length + sizeof(int) * 1 + 1 + 2 + request.Consumer.Id.Length];
bytes[0] = GetConsumerTypeByte(request.Consumer.Type);
bytes.WriteBytesFromIdentifier(request.Consumer.Id, 1);
var position = 1 + request.Consumer.Id.Length + 2;
bytes.WriteBytesFromStreamAndTopicIdentifiers(request.StreamId , request.TopicId, position);
position = 7 + 2 + request.StreamId.Length + 2 + request.TopicId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.PartitionId);
return bytes.ToArray();
}
internal static byte[] CreatePartitions(CreatePartitionsRequest request)
{
Span<byte> bytes = stackalloc byte[2 + request.StreamId.Length + 2 + request.TopicId.Length + sizeof(int)];
bytes.WriteBytesFromStreamAndTopicIdentifiers(request.StreamId , request.TopicId);
int position = 2 + request.StreamId.Length + 2 + request.TopicId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.PartitionsCount);
return bytes.ToArray();
}
internal static byte[] DeletePartitions(DeletePartitionsRequest request)
{
Span<byte> bytes = stackalloc byte[2 + request.StreamId.Length + 2 + request.TopicId.Length + sizeof(int)];
bytes.WriteBytesFromStreamAndTopicIdentifiers(request.StreamId , request.TopicId);
int position = 2 + request.StreamId.Length + 2 + request.TopicId.Length;
BinaryPrimitives.WriteInt32LittleEndian(bytes[position..(position + 4)], request.PartitionsCount);
return bytes.ToArray();
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static byte GetConsumerTypeByte(ConsumerType type)
{
return type switch
{
ConsumerType.Consumer => 1,
ConsumerType.ConsumerGroup => 2,
_ => throw new ArgumentOutOfRangeException()
};
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static byte GetPartitioningKindByte(Partitioning kind)
{
return kind switch
{
Partitioning.Balanced => 1,
Partitioning.PartitionId => 2,
Partitioning.MessageKey => 3,
_ => throw new ArgumentOutOfRangeException()
};
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static byte GetPollingStrategyByte(MessagePolling pollingStrategy)
{
return pollingStrategy switch
{
MessagePolling.Offset => 1,
MessagePolling.Timestamp => 2,
MessagePolling.First => 3,
MessagePolling.Last => 4,
MessagePolling.Next => 5,
_ => throw new ArgumentOutOfRangeException()
};
}
}