foreign/csharp/Iggy_SDK/Mappers/BinaryMapper.cs (745 lines of code) (raw):
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Enums;
using Iggy_SDK.Extensions;
using Iggy_SDK.Headers;
using System.Buffers;
using System.Buffers.Binary;
using System.Text;
namespace Iggy_SDK.Mappers;
//TODO - write unit tests for all the users related mappers
internal static class BinaryMapper
{
private const int PROPERTIES_SIZE = 45;
internal static RawPersonalAccessToken MapRawPersonalAccessToken(ReadOnlySpan<byte> payload)
{
var tokenLength = payload[0];
var token = Encoding.UTF8.GetString(payload[1..(1 + tokenLength)]);
return new RawPersonalAccessToken
{
Token = token
};
}
internal static IReadOnlyList<PersonalAccessTokenResponse> MapPersonalAccessTokens(ReadOnlySpan<byte> payload)
{
if (payload.Length == 0)
{
return Array.Empty<PersonalAccessTokenResponse>();
}
var result = new List<PersonalAccessTokenResponse>();
int length = payload.Length;
int position = 0;
while (position < length)
{
var (response, readBytes) = MapToPersonalAccessTokenResponse(payload, position);
result.Add(response);
position += readBytes;
}
return result.AsReadOnly();
}
private static (PersonalAccessTokenResponse response, int position) MapToPersonalAccessTokenResponse(ReadOnlySpan<byte> payload, int position)
{
var nameLength = (int)payload[position];
var name = Encoding.UTF8.GetString(payload[(position + 1)..(1 + position + nameLength)]);
var expiry = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 1 + nameLength)..]);
var readBytes = 1 + nameLength + 8;
return (new PersonalAccessTokenResponse
{
Name = name,
ExpiryAt = expiry == 0 ? null : DateTimeOffsetUtils.FromUnixTimeMicroSeconds(expiry).LocalDateTime
}, readBytes);
}
internal static IReadOnlyList<UserResponse> MapUsers(ReadOnlySpan<byte> payload)
{
if (payload.Length == 0)
{
return Array.Empty<UserResponse>();
}
var result = new List<UserResponse>();
int length = payload.Length;
int position = 0;
while (position < length)
{
var (response, readBytes) = MapToUserResponse(payload, position);
result.Add(response);
position += readBytes;
}
return result.AsReadOnly();
}
internal static UserResponse MapUser(ReadOnlySpan<byte> payload)
{
var (response, position) = MapToUserResponse(payload, 0);
var hasPermissions = payload[position];
if (hasPermissions == 1)
{
var permissionLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 1)..(position + 5)]);
var permissionsPayload = payload[(position + 5)..(position + 5 + permissionLength)];
var permissions = MapPermissions(permissionsPayload);
return new UserResponse
{
Permissions = permissions,
Id = response.Id,
CreatedAt = response.CreatedAt,
Username = response.Username,
Status = response.Status
};
}
return new UserResponse
{
Id = response.Id,
CreatedAt = response.CreatedAt,
Username = response.Username,
Status = response.Status,
Permissions = null
};
}
private static Permissions MapPermissions(ReadOnlySpan<byte> bytes)
{
var streamMap = new Dictionary<int, StreamPermissions>();
int index = 0;
var globalPermissions = new GlobalPermissions
{
ManageServers = bytes[index++] == 1,
ReadServers = bytes[index++] == 1,
ManageUsers = bytes[index++] == 1,
ReadUsers = bytes[index++] == 1,
ManageStreams = bytes[index++] == 1,
ReadStreams = bytes[index++] == 1,
ManageTopics = bytes[index++] == 1,
ReadTopics = bytes[index++] == 1,
PollMessages = bytes[index++] == 1,
SendMessages = bytes[index++] == 1,
};
if (bytes[index++] == 1)
{
while (true)
{
var streamId = BinaryPrimitives.ReadInt32LittleEndian(bytes[index..(index + 4)]);
index += sizeof(int);
var manageStream = bytes[index++] == 1;
var readStream = bytes[index++] == 1;
var manageTopics = bytes[index++] == 1;
var readTopics = bytes[index++] == 1;
var pollMessagesStream = bytes[index++] == 1;
var sendMessagesStream = bytes[index++] == 1;
var topicsMap = new Dictionary<int, TopicPermissions>();
if (bytes[index++] == 1)
{
while (true)
{
var topicId = BinaryPrimitives.ReadInt32LittleEndian(bytes[index..(index + 4)]);
index += sizeof(int);
var manageTopic = bytes[index++] == 1;
var readTopic = bytes[index++] == 1;
var pollMessagesTopic = bytes[index++] == 1;
var sendMessagesTopic = bytes[index++] == 1;
topicsMap.Add(topicId, new TopicPermissions
{
ManageTopic = manageTopic,
ReadTopic = readTopic,
PollMessages = pollMessagesTopic,
SendMessages = sendMessagesTopic,
});
if (bytes[index++] == 0)
break;
}
}
streamMap.Add(streamId, new StreamPermissions
{
ManageStream = manageStream,
ReadStream = readStream,
ManageTopics = manageTopics,
ReadTopics = readTopics,
PollMessages = pollMessagesStream,
SendMessages = sendMessagesStream,
Topics = topicsMap.Count > 0 ? topicsMap : null,
});
if (bytes[index++] == 0)
break;
}
}
return new Permissions
{
Global = globalPermissions,
Streams = streamMap.Count > 0 ? streamMap : null
};
}
private static (UserResponse response, int position) MapToUserResponse(ReadOnlySpan<byte> payload, int position)
{
uint id = BinaryPrimitives.ReadUInt32LittleEndian(payload[position..(position + 4)]);
ulong createdAt = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 4)..(position + 12)]);
byte status = payload[position + 12];
UserStatus userStatus = status switch
{
1 => UserStatus.Active,
2 => UserStatus.Inactive,
_ => throw new ArgumentOutOfRangeException()
};
byte usernameLength = payload[position + 13];
string username = Encoding.UTF8.GetString(payload[(position + 14)..(position + 14 + usernameLength)]);
int readBytes = 4 + 8 + 1 + 1 + usernameLength;
return (new UserResponse
{
Id = id,
CreatedAt = createdAt,
Status = userStatus,
Username = username
}, readBytes);
}
internal static ClientResponse MapClient(ReadOnlySpan<byte> payload)
{
var (response, position) = MapClientInfo(payload, 0);
var consumerGroups = new List<ConsumerGroupInfo>();
var length = payload.Length;
while (position < length)
{
for (int i = 0; i < response.ConsumerGroupsCount; i++)
{
var streamId = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
var topicId = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 4)..(position + 8)]);
var consumerGroupId = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 8)..(position + 12)]);
var consumerGroup = new ConsumerGroupInfo
{
StreamId = streamId,
TopicId = topicId,
ConsumerGroupId = consumerGroupId
};
consumerGroups.Add(consumerGroup);
position += 12;
}
}
return new ClientResponse
{
Adress = response.Adress,
ClientId = response.ClientId,
UserId = response.UserId,
Transport = response.Transport,
ConsumerGroupsCount = response.ConsumerGroupsCount,
ConsumerGroups = consumerGroups
};
}
internal static IReadOnlyList<ClientResponse> MapClients(ReadOnlySpan<byte> payload)
{
if (payload.Length == 0)
{
return Array.Empty<ClientResponse>();
}
var response = new List<ClientResponse>();
var length = payload.Length;
var position = 0;
while (position < length)
{
var (client, readBytes) = MapClientInfo(payload, position);
response.Add(client);
position += readBytes;
}
return response;
}
private static (ClientResponse response, int position) MapClientInfo(ReadOnlySpan<byte> payload, int position)
{
int readBytes;
uint id = BinaryPrimitives.ReadUInt32LittleEndian(payload[position..(position + 4)]);
uint userId = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 4)..(position + 8)]);
byte transportByte = payload[position + 8];
string transport = transportByte switch
{
1 => "TCP",
2 => "QUIC",
_ => "Unknown",
};
int addressLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 9)..(position + 13)]);
string address = Encoding.UTF8.GetString(payload[(position + 13)..(position + 13 + addressLength)]);
readBytes = 4 + 1 + 4 + 4 + addressLength;
position += readBytes;
int consumerGroupsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
readBytes += 4;
return (new ClientResponse
{
ClientId = id,
UserId = userId,
Transport = transport,
Adress = address,
ConsumerGroupsCount = consumerGroupsCount
}, readBytes);
}
internal static OffsetResponse MapOffsets(ReadOnlySpan<byte> payload)
{
var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[0..4]);
var currentOffset = BinaryPrimitives.ReadUInt64LittleEndian(payload[4..12]);
var offset = BinaryPrimitives.ReadUInt64LittleEndian(payload[12..20]);
return new OffsetResponse
{
CurrentOffset = currentOffset,
StoredOffset = offset,
PartitionId = partitionId
};
}
private static MessageState MapMessageState(ReadOnlySpan<byte> payload, int position)
{
var state = payload[position + 8] switch
{
1 => MessageState.Available,
10 => MessageState.Unavailable,
20 => MessageState.Poisoned,
30 => MessageState.MarkedForDeletion,
_ => throw new ArgumentOutOfRangeException()
};
return state;
}
internal static PolledMessages MapMessages(ReadOnlySpan<byte> payload,
Func<byte[], byte[]>? decryptor = null)
{
int length = payload.Length;
var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[0..4]);
var currentOffset = BinaryPrimitives.ReadUInt64LittleEndian(payload[4..12]);
var messagesCount = BinaryPrimitives.ReadUInt32LittleEndian(payload[12..16]);
int position = 16;
if (position >= length)
{
return PolledMessages.Empty;
}
List<MessageResponse> messages = new();
while (position < length)
{
ulong offset = BinaryPrimitives.ReadUInt64LittleEndian(payload[position..(position + 8)]);
var state = MapMessageState(payload, position);
ulong timestamp = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 9)..(position + 17)]);
var id = new Guid(payload[(position + 17)..(position + 33)]);
var checksum = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 33)..(position + 37)]);
int headersLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 37)..(position + 41)]);
var headers = headersLength switch
{
0 => null,
> 0 => MapHeaders(payload[(position + 41)..(position + 41 + headersLength)]),
< 0 => throw new ArgumentOutOfRangeException()
};
position += headersLength;
uint messageLength = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 41)..(position + 45)]);
int payloadRangeStart = position + PROPERTIES_SIZE;
int payloadRangeEnd = position + PROPERTIES_SIZE + (int)messageLength;
if (payloadRangeStart > length || payloadRangeEnd > length)
{
break;
}
var payloadSlice = payload[payloadRangeStart..payloadRangeEnd];
var messagePayload = ArrayPool<byte>.Shared.Rent(payloadSlice.Length);
var payloadSliceLen = payloadSlice.Length;
try
{
payloadSlice.CopyTo(messagePayload.AsSpan()[..payloadSliceLen]);
int totalSize = PROPERTIES_SIZE + (int)messageLength;
position += totalSize;
messages.Add(new MessageResponse
{
Offset = offset,
Timestamp = timestamp,
Id = id,
Checksum = checksum,
State = state,
Headers = headers,
Payload = decryptor is not null
? decryptor(messagePayload[..payloadSliceLen])
: messagePayload[..payloadSliceLen]
});
}
finally
{
ArrayPool<byte>.Shared.Return(messagePayload);
}
if (position + PROPERTIES_SIZE >= length)
{
break;
}
}
return new PolledMessages
{
PartitionId = partitionId,
CurrentOffset = currentOffset,
Messages = messages.AsReadOnly()
};
}
internal static PolledMessages<TMessage> MapMessages<TMessage>(ReadOnlySpan<byte> payload,
Func<byte[], TMessage> serializer, Func<byte[], byte[]>? decryptor = null)
{
int length = payload.Length;
var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[0..4]);
var currentOffset = BinaryPrimitives.ReadUInt64LittleEndian(payload[4..12]);
var messagesCount = BinaryPrimitives.ReadUInt32LittleEndian(payload[12..16]);
int position = 16;
if (position >= length)
{
return PolledMessages<TMessage>.Empty;
}
List<MessageResponse<TMessage>> messages = new();
while (position < length)
{
ulong offset = BinaryPrimitives.ReadUInt64LittleEndian(payload[position..(position + 8)]);
var state = MapMessageState(payload, position);
ulong timestamp = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 9)..(position + 17)]);
var id = new Guid(payload[(position + 17)..(position + 33)]);
var checksum = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 33)..(position + 37)]);
int headersLength = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 37)..(position + 41)]);
var headers = headersLength switch
{
0 => null,
> 0 => MapHeaders(payload[(position + 41)..(position + 41 + headersLength)]),
< 0 => throw new ArgumentOutOfRangeException()
};
position += headersLength;
uint messageLength = BinaryPrimitives.ReadUInt32LittleEndian(payload[(position + 41)..(position + 45)]);
int payloadRangeStart = position + PROPERTIES_SIZE;
int payloadRangeEnd = position + PROPERTIES_SIZE + (int)messageLength;
if (payloadRangeStart > length || payloadRangeEnd > length)
{
break;
}
var payloadSlice = payload[payloadRangeStart..payloadRangeEnd];
var messagePayload = ArrayPool<byte>.Shared.Rent(payloadSlice.Length);
var payloadSliceLen = payloadSlice.Length;
try
{
payloadSlice.CopyTo(messagePayload.AsSpan()[..payloadSliceLen]);
int totalSize = PROPERTIES_SIZE + (int)messageLength;
position += totalSize;
messages.Add(new MessageResponse<TMessage>
{
Offset = offset,
Timestamp = timestamp,
Checksum = checksum,
Id = id,
Headers = headers,
State = state,
Message = decryptor is not null
? serializer(decryptor(messagePayload[..payloadSliceLen]))
: serializer(messagePayload[..payloadSliceLen])
});
}
finally
{
ArrayPool<byte>.Shared.Return(messagePayload);
}
if (position + PROPERTIES_SIZE >= length)
{
break;
}
}
return new PolledMessages<TMessage>
{
PartitionId = partitionId,
CurrentOffset = currentOffset,
Messages = messages.AsReadOnly()
};
}
private static Dictionary<HeaderKey, HeaderValue> MapHeaders(ReadOnlySpan<byte> payload)
{
var headers = new Dictionary<HeaderKey, HeaderValue>();
int position = 0;
while (position < payload.Length)
{
var keyLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
if (keyLength is 0 or > 255)
{
throw new ArgumentException("Key has incorrect size, must be between 1 and 255", nameof(keyLength));
}
var key = Encoding.UTF8.GetString(payload[(position + 4)..(position + 4 + keyLength)]);
position += 4 + keyLength;
var headerKind = MapHeaderKind(payload, position);
position++;
var valueLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
if (valueLength is 0 or > 255)
{
throw new ArgumentException("Value has incorrect size, must be between 1 and 255", nameof(valueLength));
}
position += 4;
var value = payload[position..(position + valueLength)];
position += valueLength;
headers.Add(HeaderKey.New(key), new()
{
Kind = headerKind, Value = value.ToArray()
}
);
}
return headers;
}
private static HeaderKind MapHeaderKind(ReadOnlySpan<byte> payload, int position)
{
var headerKind = payload[position] switch
{
1 => HeaderKind.Raw,
2 => HeaderKind.String,
3 => HeaderKind.Bool,
6 => HeaderKind.Int32,
7 => HeaderKind.Int64,
8 => HeaderKind.Int128,
11 => HeaderKind.Uint32,
12 => HeaderKind.Uint64,
13 => HeaderKind.Uint128,
14 => HeaderKind.Float,
15 => HeaderKind.Double,
_ => throw new ArgumentOutOfRangeException()
};
return headerKind;
}
internal static IReadOnlyList<StreamResponse> MapStreams(ReadOnlySpan<byte> payload)
{
List<StreamResponse> streams = new();
int length = payload.Length;
int position = 0;
while (position < length)
{
(StreamResponse stream, int readBytes) = MapToStream(payload, position);
streams.Add(stream);
position += readBytes;
}
return streams.AsReadOnly();
}
internal static StreamResponse MapStream(ReadOnlySpan<byte> payload)
{
(StreamResponse stream, int position) = MapToStream(payload, 0);
List<TopicResponse> topics = new();
int length = payload.Length;
while (position < length)
{
(TopicResponse topic, int readBytes) = MapToTopic(payload, position);
topics.Add(topic);
position += readBytes;
}
return new StreamResponse
{
Id = stream.Id,
TopicsCount = stream.TopicsCount,
Name = stream.Name,
Topics = topics,
CreatedAt = stream.CreatedAt,
MessagesCount = stream.MessagesCount,
Size = stream.Size
};
}
private static (StreamResponse stream, int readBytes) MapToStream(ReadOnlySpan<byte> payload, int position)
{
int id = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
ulong createdAt = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 4)..(position + 12)]);
int topicsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 12)..(position + 16)]);
ulong sizeBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 16)..(position + 24)]);
ulong messagesCount = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 24)..(position + 32)]);
int nameLength = (int)payload[position + 32];
string name = Encoding.UTF8.GetString(payload[(position + 33)..(position + 33 + nameLength)]);
int readBytes = 4 + 4 + 8 + 8 + 8 + 1 + nameLength;
return (
new StreamResponse
{
Id = id,
TopicsCount = topicsCount,
Name = name,
Size = sizeBytes,
MessagesCount = messagesCount,
CreatedAt = DateTimeOffsetUtils.FromUnixTimeMicroSeconds(createdAt).LocalDateTime
}, readBytes);
}
internal static IReadOnlyList<TopicResponse> MapTopics(ReadOnlySpan<byte> payload)
{
List<TopicResponse> topics = new();
int length = payload.Length;
int position = 0;
while (position < length)
{
(TopicResponse topic, int readBytes) = MapToTopic(payload, position);
topics.Add(topic);
position += readBytes;
}
return topics.AsReadOnly();
}
internal static TopicResponse MapTopic(ReadOnlySpan<byte> payload)
{
(TopicResponse topic, int position) = MapToTopic(payload, 0);
List<PartitionContract> partitions = new();
int length = payload.Length;
while (position < length)
{
(PartitionContract partition, int readBytes) = MapToPartition(payload, position);
partitions.Add(partition);
position += readBytes;
}
return new TopicResponse
{
Id = topic.Id,
Name = topic.Name,
PartitionsCount = topic.PartitionsCount,
CreatedAt = topic.CreatedAt,
MessageExpiry = topic.MessageExpiry,
MessagesCount = topic.MessagesCount,
Size = topic.Size,
ReplicationFactor = topic.ReplicationFactor,
MaxTopicSize = topic.MaxTopicSize,
Partitions = partitions
};
}
private static (TopicResponse topic, int readBytes) MapToTopic(ReadOnlySpan<byte> payload, int position)
{
int id = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
ulong createdAt = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 4)..(position + 12)]);
int partitionsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 12)..(position + 16)]);
int messageExpiry = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 16)..(position + 20)]);
ulong maxTopicSize = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 20)..(position + 28)]);
byte replicationFactor = payload[position + 28];
ulong sizeBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 29)..(position + 37)]);
ulong messagesCount = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 37)..(position + 45)]);
int nameLength = (int)payload[position + 45];
string name = Encoding.UTF8.GetString(payload[(position + 46)..(position + 46 + nameLength)]);
int readBytes = 4 + 8 + 4 + 4 + 8 + 8 + 8 + 1 + 1 + name.Length;
return (
new TopicResponse
{
Id = id,
PartitionsCount = partitionsCount,
Name = name,
MessagesCount = messagesCount,
Size = sizeBytes,
CreatedAt = DateTimeOffsetUtils.FromUnixTimeMicroSeconds(createdAt).LocalDateTime,
MessageExpiry = messageExpiry,
ReplicationFactor = replicationFactor,
MaxTopicSize = maxTopicSize
}, readBytes);
}
private static (PartitionContract partition, int readBytes) MapToPartition(ReadOnlySpan<byte>
payload, int position)
{
int id = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
ulong createdAt = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 4)..(position + 12)]);
int segmentsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 12)..(position + 16)]);
ulong currentOffset = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 16)..(position + 24)]);
ulong sizeBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 24)..(position + 32)]);
ulong messagesCount = BinaryPrimitives.ReadUInt64LittleEndian(payload[(position + 32)..(position + 40)]);
int readBytes = 4 + 4 + 8 + 8 + 8 + 8;
return (
new PartitionContract
{
Id = id,
SegmentsCount = segmentsCount,
CurrentOffset = currentOffset,
Size = sizeBytes,
CreatedAt = DateTimeOffsetUtils.FromUnixTimeMicroSeconds(createdAt).LocalDateTime,
MessagesCount = messagesCount
}, readBytes);
}
internal static List<ConsumerGroupResponse> MapConsumerGroups(ReadOnlySpan<byte> payload)
{
List<ConsumerGroupResponse> consumerGroups = new();
int length = payload.Length;
int position = 0;
while (position < length)
{
(ConsumerGroupResponse consumerGroup, int readBytes) = MapToConsumerGroup(payload, position);
consumerGroups.Add(consumerGroup);
position += readBytes;
}
return consumerGroups;
}
internal static Stats MapStats(ReadOnlySpan<byte> payload)
{
int processId = BinaryPrimitives.ReadInt32LittleEndian(payload[0..4]);
float cpuUsage = BitConverter.ToSingle(payload[4..8]);
float totalCpuUsage = BitConverter.ToSingle(payload[8..12]);
ulong memoryUsage = BinaryPrimitives.ReadUInt64LittleEndian(payload[12..20]);
ulong totalMemory = BinaryPrimitives.ReadUInt64LittleEndian(payload[20..28]);
ulong availableMemory = BinaryPrimitives.ReadUInt64LittleEndian(payload[28..36]);
ulong runTime = BinaryPrimitives.ReadUInt64LittleEndian(payload[36..44]);
ulong startTime = BinaryPrimitives.ReadUInt64LittleEndian(payload[44..52]);
ulong readBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[52..60]);
ulong writtenBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[60..68]);
ulong totalSizeBytes = BinaryPrimitives.ReadUInt64LittleEndian(payload[68..76]);
int streamsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[76..80]);
int topicsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[80..84]);
int partitionsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[84..88]);
int segmentsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[88..92]);
ulong messagesCount = BinaryPrimitives.ReadUInt64LittleEndian(payload[92..100]);
int clientsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[100..104]);
int consumerGroupsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[104..108]);
int position = 108;
int hostnameLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
string hostname = Encoding.UTF8.GetString(payload[(position + 4)..(position + 4 + hostnameLength)]);
position += 4 + hostnameLength;
int osNameLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
string osName = Encoding.UTF8.GetString(payload[(position + 4)..(position + 4 + osNameLength)]);
position += 4 + osNameLength;
int osVersionLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
string osVersion = Encoding.UTF8.GetString(payload[(position + 4)..(position + 4 + osVersionLength)]);
position += 4 + osVersionLength;
int kernelVersionLength = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
string kernelVersion = Encoding.UTF8.GetString(payload[(position + 4)..(position + 4 + kernelVersionLength)]);
return new Stats
{
ProcessId = processId,
Hostname = hostname,
ClientsCount = clientsCount,
CpuUsage = cpuUsage,
TotalCpuUsage = totalCpuUsage,
MemoryUsage = memoryUsage,
TotalMemory = totalMemory,
AvailableMemory = availableMemory,
RunTime = runTime,
StartTime = DateTimeOffset.FromUnixTimeSeconds((long)startTime),
ReadBytes = readBytes,
WrittenBytes = writtenBytes,
StreamsCount = streamsCount,
KernelVersion = kernelVersion,
MessagesCount = messagesCount,
TopicsCount = topicsCount,
PartitionsCount = partitionsCount,
SegmentsCount = segmentsCount,
OsName = osName,
OsVersion = osVersion,
ConsumerGroupsCount = consumerGroupsCount,
MessagesSizeBytes = totalSizeBytes
};
}
internal static ConsumerGroupResponse MapConsumerGroup(ReadOnlySpan<byte> payload)
{
(ConsumerGroupResponse consumerGroup, int position) = MapToConsumerGroup(payload, 0);
var members = new List<ConsumerGroupMember>();
while (position < payload.Length)
{
(var member, int readBytes) = MapToMember(payload, position);
members.Add(member);
position += readBytes;
}
return new ConsumerGroupResponse
{
Id = consumerGroup.Id,
MembersCount = consumerGroup.MembersCount,
PartitionsCount = consumerGroup.PartitionsCount,
Name = consumerGroup.Name,
Members = members
};
}
private static (ConsumerGroupMember, int readBytes) MapToMember(ReadOnlySpan<byte> payload, int position)
{
var id = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
var partitionsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 4)..(position + 8)]);
var partitions = new List<int>();
for (int i = 0; i < partitionsCount; i++)
{
var partitionId = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 8 + (i * 4))..(position + 8 + ((i + 1) * 4))]);
partitions.Add(partitionId);
}
return (new ConsumerGroupMember
{
Id = id,
PartitionsCount = partitionsCount,
Partitions = partitions
}, 8 + partitionsCount * 4);
}
private static (ConsumerGroupResponse consumerGroup, int readBytes) MapToConsumerGroup(ReadOnlySpan<byte> payload,
int position)
{
int id = BinaryPrimitives.ReadInt32LittleEndian(payload[position..(position + 4)]);
int partitionsCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 4)..(position + 8)]);
int membersCount = BinaryPrimitives.ReadInt32LittleEndian(payload[(position + 8)..(position + 12)]);
int nameLength = payload[position + 12];
string name = Encoding.UTF8.GetString(payload[(position + 13)..(position + 13 + nameLength)]);
return (new ConsumerGroupResponse { Id = id,
Name = name,
MembersCount = membersCount,
PartitionsCount = partitionsCount
},
13 + name.Length);
}
}