foreign/csharp/Iggy_SDK_Tests/ContractTests/TcpContract.cs (731 lines of code) (raw):
using FluentAssertions;
using Iggy_SDK;
using Iggy_SDK_Tests.Utils;
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Contracts.Tcp;
using Iggy_SDK.Enums;
using Iggy_SDK_Tests.Utils.Groups;
using Iggy_SDK_Tests.Utils.Messages;
using Iggy_SDK_Tests.Utils.Offset;
using Iggy_SDK_Tests.Utils.Partitions;
using Iggy_SDK_Tests.Utils.Streams;
using Iggy_SDK_Tests.Utils.Topics;
using Iggy_SDK_Tests.Utils.Users;
using Iggy_SDK.Contracts.Http.Auth;
using System.Buffers.Binary;
using System.Text;
namespace Iggy_SDK_Tests.ContractTests;
public sealed class TcpContract
{
[Fact]
public void TcpContracts_DeletePersonalRequestToken_HasValidBytes()
{
// Arrange
var request = new DeletePersonalAccessTokenRequest
{
Name = "TestUser"
};
// Act
byte[] result = TcpContracts.DeletePersonalRequestToken(request);
// Assert
Assert.Equal(5 + request.Name.Length, result.Length);
Assert.Equal((byte)request.Name.Length, result[0]);
Assert.Equal(Encoding.UTF8.GetBytes(request.Name), result[1..(1 + request.Name.Length)]);
}
[Fact]
public void TcpContracts_CreatePersonalAccessToken_HasValidBytes_ValidExpiry()
{
// Arrange
var request = new CreatePersonalAccessTokenRequest
{
Name = "TestUser",
Expiry = 3600 // Valid Expiry Value
};
// Act
byte[] result = TcpContracts.CreatePersonalAccessToken(request);
// Assert
Assert.Equal(5 + request.Name.Length, result.Length); // The expected length
Assert.Equal(Encoding.UTF8.GetBytes(request.Name), result[1..(1 + request.Name.Length)]); // The expected length of the name
Assert.Equal((uint)3600, BinaryPrimitives.ReadUInt32LittleEndian(result[(1 + request.Name.Length)..]));
}
[Fact]
public void TcpContracts_CreatePersonalAccessToken_HasValidBytes_NullExpiry()
{
// Arrange
var request = new CreatePersonalAccessTokenRequest
{
Name = "TestUser",
Expiry = null // Null Expiry
};
// Act
byte[] result = TcpContracts.CreatePersonalAccessToken(request);
// Assert
Assert.Equal(5 + request.Name.Length, result.Length); // The expected length
Assert.Equal(Encoding.UTF8.GetBytes(request.Name), result[1..(1 + request.Name.Length)]); // The expected length of the name
Assert.Equal((uint)0, BinaryPrimitives.ReadUInt32LittleEndian(result[(1 + request.Name.Length)..]));
}
[Fact]
public void TcpContracts_LoginUser_HasCorrectBytes()
{
// Arrange
var request = new LoginUserRequest
{
Username = "testuser",
Password = "testpassword"
};
// Act
var result = TcpContracts.LoginUser(request);
// Assert
var expectedLength = request.Username.Length + request.Password.Length + 2;
Assert.Equal(expectedLength, result.Length);
var position = 0;
var usernameLength = result[position];
position += 1;
var usernameBytes = result[position..(position + usernameLength)];
position += usernameLength;
var passwordLength = result[position];
position += 1;
var passwordBytes = result[position..(position + passwordLength)];
var decodedUsername = Encoding.UTF8.GetString(usernameBytes);
var decodedPassword = Encoding.UTF8.GetString(passwordBytes);
Assert.Equal(request.Username, decodedUsername);
Assert.Equal(request.Password, decodedPassword);
}
[Fact]
public void TcpContracts_UpdateUser_HasCorrectBytes()
{
// Arrange
var request = new UpdateUserRequest
{
UserId = Identifier.Numeric(1),
Username = "newusername",
UserStatus = UserStatus.Active
};
// Act
var result = TcpContracts.UpdateUser(request);
// Assert
var expectedLength = request.UserId.Length + 2 +
(request.Username?.Length ?? 0) + 2 + 1 + 1;
Assert.Equal(expectedLength, result.Length);
var position = 2;
var userIdBytes = result[position..(position + request.UserId.Length)];
position += request.UserId.Length;
var usernameFlag = result[position];
position += 1;
if (usernameFlag == 1)
{
var usernameLength = result[position];
position += 1;
var usernameBytes = result[position..(position + usernameLength)];
position += usernameLength;
var decodedUsername = Encoding.UTF8.GetString(usernameBytes);
Assert.Equal(request.Username, decodedUsername);
}
else
{
Assert.Null(request.Username);
}
var statusFlag = result[position];
position += 1;
if (statusFlag == 1)
{
var userStatus = statusFlag switch
{
1 => UserStatus.Active,
2 => UserStatus.Inactive
};
Assert.Equal(request.UserStatus, userStatus);
}
else
{
Assert.Null(request.UserStatus);
}
Assert.Equal(request.UserId.Value, userIdBytes);
}
[Fact]
public void TcpContracts_CreateUser_NoPermission_HasCorrectBytes()
{
// Arrange
var request = new CreateUserRequest
{
Username = "testuser",
Password = "testpassword",
Status = UserStatus.Active,
Permissions = null,
};
// Act
var result = TcpContracts.CreateUser(request);
// Assert
int position = 0;
Assert.Equal((byte)request.Username.Length, result[position]);
position += 1;
var usernameBytes = result[position..(position + request.Username.Length)];
position += request.Username.Length;
var decodedUsername = Encoding.UTF8.GetString(usernameBytes);
Assert.Equal(request.Username, decodedUsername);
Assert.Equal((byte)request.Password.Length, result[position]);
position += 1;
var passwordBytes = result[position..(position + request.Password.Length)];
position += request.Password.Length;
var decodedPassword = Encoding.UTF8.GetString(passwordBytes);
Assert.Equal(request.Password, decodedPassword);
var expectedStatusByte = request.Status switch
{
UserStatus.Active => (byte)1,
UserStatus.Inactive => (byte)2,
_ => throw new ArgumentOutOfRangeException()
};
Assert.Equal(expectedStatusByte, result[position]);
position += 1;
var permissionsFlag = result[position];
position += 1;
if (permissionsFlag == 1)
{
var permissionsSize = BinaryPrimitives.ReadInt32LittleEndian(result[position..(position + 4)]);
position += 4;
var permissionsBytes = result[position..(position + permissionsSize)];
}
else
{
Assert.Null(request.Permissions);
}
}
[Fact]
public void TcpContracts_ChangePassword_HasCorrectBytes()
{
// Arrange
var request = new ChangePasswordRequest
{
UserId = Identifier.Numeric(1),
CurrentPassword = "oldpassword",
NewPassword = "newpassword"
};
// Act
var result = TcpContracts.ChangePassword(request);
// Assert
int position = 2;
var userIdBytes = result[position..(position + request.UserId.Length)];
position += request.UserId.Length;
Assert.Equal(request.UserId.Value, userIdBytes);
Assert.Equal((byte)request.CurrentPassword.Length, result[position]);
position += 1;
var currentPasswordBytes = result[position..(position + request.CurrentPassword.Length)];
position += request.CurrentPassword.Length;
var decodedCurrentPassword = Encoding.UTF8.GetString(currentPasswordBytes);
Assert.Equal(request.CurrentPassword, decodedCurrentPassword);
Assert.Equal((byte)request.NewPassword.Length, result[position]);
position += 1;
var newPasswordBytes = result[position..(position + request.NewPassword.Length)];
position += request.NewPassword.Length;
var decodedNewPassword = Encoding.UTF8.GetString(newPasswordBytes);
Assert.Equal(request.NewPassword, decodedNewPassword);
}
[Fact]
public void TcpContracts_UpdatePermissions_HasCorrectBytes()
{
// Arrange
var request = new UpdateUserPermissionsRequest
{
UserId = Identifier.Numeric(1),
Permissions = PermissionsFactory.CreatePermissions()
};
// Act
var result = TcpContracts.UpdatePermissions(request);
// Assert
int position = 2;
var userIdBytes = result[position..(position + request.UserId.Length)];
position += request.UserId.Length;
Assert.Equal(request.UserId.Value, userIdBytes);
var permissionsFlag = result[position];
position += 1;
if (permissionsFlag == 1)
{
var permissionsSize = BinaryPrimitives.ReadInt32LittleEndian(result[position..(position + 4)]);
position += 4;
var permissionsBytes = result[position..(position + permissionsSize)];
var mappedPermissions = PermissionsFactory.PermissionsFromBytes(permissionsBytes);
request.Permissions.Global.Should().BeEquivalentTo(mappedPermissions.Global);
if (request.Permissions.Streams != null)
{
Assert.NotNull(mappedPermissions.Streams);
foreach (var (streamId, stream) in request.Permissions.Streams)
{
Assert.True(mappedPermissions.Streams.ContainsKey(streamId));
var mappedStream = mappedPermissions.Streams[streamId];
Assert.Equal(stream.ManageStream, mappedStream.ManageStream);
Assert.Equal(stream.ReadStream, mappedStream.ReadStream);
Assert.Equal(stream.ManageTopics, mappedStream.ManageTopics);
Assert.Equal(stream.ReadTopics, mappedStream.ReadTopics);
Assert.Equal(stream.PollMessages, mappedStream.PollMessages);
Assert.Equal(stream.SendMessages, mappedStream.SendMessages);
if (stream.Topics != null)
{
Assert.NotNull(mappedStream.Topics);
foreach (var (topicId, topic) in stream.Topics)
{
Assert.True(mappedStream.Topics.ContainsKey(topicId));
var mappedTopic = mappedStream.Topics[topicId];
Assert.Equal(topic.ManageTopic, mappedTopic.ManageTopic);
Assert.Equal(topic.ReadTopic, mappedTopic.ReadTopic);
Assert.Equal(topic.PollMessages, mappedTopic.PollMessages);
Assert.Equal(topic.SendMessages, mappedTopic.SendMessages);
}
}
else
{
Assert.Null(mappedStream.Topics);
}
}
}
else
{
Assert.Null(mappedPermissions.Streams);
}
}
else
{
Assert.Null(request.Permissions);
}
}
[Fact]
public void TcpContracts_CreateUser_WithPermission_HasCorrectBytes()
{
// Arrange
var request = new CreateUserRequest
{
Username = "testuser",
Password = "testpassword",
Status = UserStatus.Active,
Permissions = PermissionsFactory.CreatePermissions(),
};
// Act
var result = TcpContracts.CreateUser(request);
// Assert
int position = 0;
Assert.Equal((byte)request.Username.Length, result[position]);
position += 1;
var usernameBytes = result[position..(position + request.Username.Length)];
position += request.Username.Length;
var decodedUsername = Encoding.UTF8.GetString(usernameBytes);
Assert.Equal(request.Username, decodedUsername);
Assert.Equal((byte)request.Password.Length, result[position]);
position += 1;
var passwordBytes = result[position..(position + request.Password.Length)];
position += request.Password.Length;
var decodedPassword = Encoding.UTF8.GetString(passwordBytes);
Assert.Equal(request.Password, decodedPassword);
var expectedStatusByte = request.Status switch
{
UserStatus.Active => (byte)1,
UserStatus.Inactive => (byte)2,
_ => throw new ArgumentOutOfRangeException()
};
Assert.Equal(expectedStatusByte, result[position]);
position += 1;
var permissionsFlag = result[position];
position += 1;
if (permissionsFlag == 1)
{
var permissionsSize = BinaryPrimitives.ReadInt32LittleEndian(result[position..(position + 4)]);
position += 4;
var permissionsBytes = result[position..(position + permissionsSize)];
var mappedPermissions = PermissionsFactory.PermissionsFromBytes(permissionsBytes);
request.Permissions.Global.Should().BeEquivalentTo(mappedPermissions.Global);
if (request.Permissions.Streams != null)
{
Assert.NotNull(mappedPermissions.Streams);
foreach (var (streamId, stream) in request.Permissions.Streams)
{
Assert.True(mappedPermissions.Streams.ContainsKey(streamId));
var mappedStream = mappedPermissions.Streams[streamId];
Assert.Equal(stream.ManageStream, mappedStream.ManageStream);
Assert.Equal(stream.ReadStream, mappedStream.ReadStream);
Assert.Equal(stream.ManageTopics, mappedStream.ManageTopics);
Assert.Equal(stream.ReadTopics, mappedStream.ReadTopics);
Assert.Equal(stream.PollMessages, mappedStream.PollMessages);
Assert.Equal(stream.SendMessages, mappedStream.SendMessages);
if (stream.Topics != null)
{
Assert.NotNull(mappedStream.Topics);
foreach (var (topicId, topic) in stream.Topics)
{
Assert.True(mappedStream.Topics.ContainsKey(topicId));
var mappedTopic = mappedStream.Topics[topicId];
Assert.Equal(topic.ManageTopic, mappedTopic.ManageTopic);
Assert.Equal(topic.ReadTopic, mappedTopic.ReadTopic);
Assert.Equal(topic.PollMessages, mappedTopic.PollMessages);
Assert.Equal(topic.SendMessages, mappedTopic.SendMessages);
}
}
else
{
Assert.Null(mappedStream.Topics);
}
}
}
else
{
Assert.Null(request.Permissions);
}
}
}
[Fact]
public void TcpContracts_ChangePasswordRequest_HasCorrectBytes()
{
// Arrange
var request = new ChangePasswordRequest
{
UserId = Identifier.Numeric(1),
CurrentPassword = "oldpassword",
NewPassword = "newpassword"
};
// Act
var result = TcpContracts.ChangePassword(request);
// Assert
var expectedLength = request.UserId.Length + 2 +
request.CurrentPassword.Length + request.NewPassword.Length + 2;
Assert.Equal(expectedLength, result.Length);
// Validate bytes can be translated back to properties
var position = 2;
var userIdBytes = result[position..(position + request.UserId.Length)];
position += request.UserId.Length;
var currentPasswordLength = result[position];
position += 1;
var currentPasswordBytes = result[position..(position + currentPasswordLength)];
position += currentPasswordLength;
var newPasswordLength = result[position];
position += 1;
var newPasswordBytes = result[position..(position + newPasswordLength)];
var decodedUserId = BinaryPrimitives.ReadInt32LittleEndian(userIdBytes);
var decodedCurrentPassword = Encoding.UTF8.GetString(currentPasswordBytes);
var decodedNewPassword = Encoding.UTF8.GetString(newPasswordBytes);
Assert.Equal(request.UserId.Value, userIdBytes);
Assert.Equal(request.CurrentPassword, decodedCurrentPassword);
Assert.Equal(request.NewPassword, decodedNewPassword);
}
[Fact]
public void TcpContracts_MessageFetchRequest_HasCorrectBytes()
{
// Arrange
var request = MessageFactory.CreateMessageFetchRequestConsumer();
int messageBufferSize = 23 + 2 + 4 + 2 + 2 + request.Consumer.Id.Length;
var result = new byte[messageBufferSize];
// Act
TcpContracts.GetMessages(result, request);
// Assert
Assert.Equal(result[0] switch
{
1 => ConsumerType.Consumer,
2 => ConsumerType.ConsumerGroup,
_ => throw new ArgumentOutOfRangeException()
}, request.Consumer.Type);
Assert.Equal(request.Consumer.Id.Kind.GetByte(), result[1]);
Assert.Equal(request.StreamId.Value, BytesToIdentifierNumeric(result, 7).Value);
Assert.Equal(request.TopicId.Value, BytesToIdentifierNumeric(result, 13).Value);
Assert.Equal(request.StreamId.Kind, BytesToIdentifierNumeric(result, 7).Kind);
Assert.Equal(request.TopicId.Kind, BytesToIdentifierNumeric(result, 13).Kind);
Assert.Equal(request.StreamId.Length, BytesToIdentifierNumeric(result, 7).Length);
Assert.Equal(request.TopicId.Length, BytesToIdentifierNumeric(result, 13).Length);
Assert.Equal(request.PartitionId, BitConverter.ToInt32(result[19..23]));
Assert.Equal(
result[23] switch
{
1 => MessagePolling.Offset,
2 => MessagePolling.Timestamp,
3 => MessagePolling.First,
4 => MessagePolling.Last,
5 => MessagePolling.Next,
_ => throw new ArgumentOutOfRangeException()
}, request.PollingStrategy.Kind);
Assert.Equal(request.PollingStrategy.Value, BitConverter.ToUInt64(result[24..32]));
Assert.Equal(request.Count, BitConverter.ToInt32(result[32..36]));
Assert.Equal(request.AutoCommit, result[36] switch
{
0 => false,
1 => true,
_ => throw new ArgumentOutOfRangeException()
});
}
[Fact]
public void TcpContracts_MessageSendRequest_WithNoHeaders_HasCorrectBytes()
{
// Arrange
var streamId = Identifier.Numeric(1);
var topicId = Identifier.Numeric(1);
var request = MessageFactory.CreateMessageSendRequest();
var messageBufferSize = request.Messages.Sum(message => 16 + 4 + 4 + message.Payload.Length)
+ request.Partitioning.Length + 14;
var result = new byte[messageBufferSize];
// Act
TcpContracts.CreateMessage(result, streamId, topicId, request.Partitioning, request.Messages);
//Assert
Assert.Equal(streamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(topicId.Value, BytesToIdentifierNumeric(result, 6).Value);
Assert.Equal(streamId.Length, BytesToIdentifierNumeric(result, 0).Length);
Assert.Equal(topicId.Length, BytesToIdentifierNumeric(result, 6).Length);
Assert.Equal(streamId.Kind, BytesToIdentifierNumeric(result, 0).Kind);
Assert.Equal(topicId.Kind, BytesToIdentifierNumeric(result, 6).Kind);
Assert.Equal(request.Partitioning.Kind, result[12] switch
{
1 => Partitioning.Balanced,
2 => Partitioning.PartitionId,
3 => Partitioning.MessageKey,
_ => throw new ArgumentOutOfRangeException()
});
Assert.Equal(request.Partitioning.Length, result[13]);
Assert.Equal(request.Partitioning.Value.Length, result[14..(14 + request.Partitioning.Length)].Length);
int currentIndex = 14 + request.Partitioning.Length;
foreach (var message in request.Messages)
{
// Assert
Assert.Equal(message.Id, new Guid(result[currentIndex..(currentIndex + 16)]));
Assert.Equal(result[(currentIndex + 16)..(currentIndex + 20)], new byte[] { 0, 0, 0, 0 });
currentIndex += 20;
int payloadLength = BitConverter.ToInt32(result[currentIndex..(currentIndex + 4)]);
currentIndex += 4;
byte[] payload = result[currentIndex..(currentIndex + payloadLength)].ToArray();
currentIndex += payloadLength;
Assert.Equal(message.Payload.Length, payload.Length);
Assert.Equal(message.Payload, payload);
}
}
[Fact]
public void TcpContracts_CreateStream_HasCorrectBytes()
{
// Arrange
var request = StreamFactory.CreateStreamRequest();
// Act
var result = TcpContracts.CreateStream(request).AsSpan();
// Assert
int expectedBytesLength = sizeof(int) + request.Name.Length + 1;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(request.StreamId, BitConverter.ToInt32(result[..5]));
Assert.Equal(request.Name, Encoding.UTF8.GetString(result[5..]));
}
[Fact]
public void TcpContracts_CreateGroup_HasCorrectBytes()
{
// Arrange
var streamId = Identifier.String("my-stream");
var topicId = Identifier.String("my-topic");
var request = new CreateConsumerGroupRequest
{
Name = Utility.RandomString(69),
StreamId = streamId,
TopicId = topicId,
ConsumerGroupId = Random.Shared.Next(1, 69)
};
// Act
var result = TcpContracts.CreateGroup(request).AsSpan();
// Assert
int expectedBytesLength = 2 + streamId.Length + 2 + topicId.Length + 4 + 1 + request.Name.Length;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(streamId.Value, BytesToIdentifierString(result, 0).Value);
Assert.Equal(topicId.Value, BytesToIdentifierString(result, 2 + streamId.Length).Value);
Assert.Equal(streamId.Kind, BytesToIdentifierString(result, 0).Kind);
Assert.Equal(topicId.Kind, BytesToIdentifierString(result, 2 + streamId.Length).Kind);
Assert.Equal(streamId.Length, BytesToIdentifierString(result, 0).Length);
Assert.Equal(topicId.Length, BytesToIdentifierString(result, 2 + streamId.Length).Length);
var position = 2 + streamId.Length + 2 + topicId.Length + 4;
Assert.Equal(request.Name.Length, result[position]);
}
[Fact]
public void TcpContracts_DeleteGroup_HasCorrectBytes()
{
// Arrange
var streamId = Identifier.Numeric(1);
var topicId = Identifier.Numeric(1);
var groupId = Identifier.Numeric(1);
// Act
var result = TcpContracts.DeleteGroup(streamId, topicId, groupId).AsSpan();
// Assert
int expectedBytesLength = 2 + streamId.Length + 2 + topicId.Length + groupId.Length + 2;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(streamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(topicId.Value, BytesToIdentifierNumeric(result, 6).Value);
Assert.Equal(groupId.Value, BytesToIdentifierNumeric(result, 12).Value);
}
[Fact]
public void TcpContracts_GetGroups_HasCorrectBytes()
{
// Arrange
var streamId = Identifier.String("my-stream");
var topicId = Identifier.Numeric(1);
// Act
var result = TcpContracts.GetGroups(streamId, topicId).AsSpan();
// Assert
int expectedBytesLength = 2 + streamId.Length + 2 + topicId.Length;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(streamId.Value, BytesToIdentifierString(result, 0).Value);
Assert.Equal(topicId.Value, BytesToIdentifierNumeric(result, 2 + streamId.Length).Value);
}
[Fact]
public void TcpContracts_JoinGroup_HasCorrectBytes()
{
// Arrange
var request = ConsumerGroupFactory.CreateJoinGroupRequest();
// Act
var result = TcpContracts.JoinGroup(request).AsSpan();
// Assert
int expectedBytesLength = 2 + request.StreamId.Length + 2 + request.TopicId.Length + request.ConsumerGroupId.Length + 2;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(request.StreamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(request.TopicId.Value, BytesToIdentifierNumeric(result, 6).Value);
Assert.Equal(request.ConsumerGroupId.Value, BytesToIdentifierNumeric(result, 12).Value);
}
[Fact]
public void TcpContracts_LeaveGroup_HasCorrectBytes()
{
// Arrange
var request = ConsumerGroupFactory.CreateLeaveGroupRequest();
// Act
var result = TcpContracts.LeaveGroup(request).AsSpan();
// Assert
int expectedBytesLength = 2 + request.StreamId.Length + 2 + request.TopicId.Length + request.ConsumerGroupId.Length + 2;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(request.StreamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(request.TopicId.Value, BytesToIdentifierNumeric(result, 6).Value);
Assert.Equal(request.ConsumerGroupId.Value, BytesToIdentifierNumeric(result, 12).Value);
}
[Fact]
public void TcpContracts_GetGroup_HasCorrectBytes()
{
// Arrange
var streamId = Identifier.String("my-stream");
var topicId = Identifier.Numeric(1);
int groupId = 1;
var groupIdentifier = Identifier.Numeric(groupId);
// Act
var result = TcpContracts.GetGroup(streamId, topicId, groupIdentifier).AsSpan();
// Assert
int expectedBytesLength = 2 + streamId.Length + 2 + topicId.Length + groupIdentifier.Length + 2;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(streamId.Value, BytesToIdentifierString(result, 0).Value);
Assert.Equal(topicId.Value, BytesToIdentifierNumeric(result, 2 + streamId.Length).Value);
Assert.Equal(streamId.Kind, BytesToIdentifierString(result, 0).Kind);
Assert.Equal(topicId.Kind, BytesToIdentifierNumeric(result, 2 + streamId.Length).Kind);
Assert.Equal(streamId.Length, BytesToIdentifierString(result, 0).Length);
Assert.Equal(topicId.Length, BytesToIdentifierNumeric(result, 2 + streamId.Length).Length);
var position = 2 + streamId.Length + 2 + topicId.Length;
Assert.Equal(groupIdentifier.Kind.GetByte(), result[position]);
}
[Fact]
public void TcpContracts_CreateTopic_HasCorrectBytes()
{
// Arrange
var streamId = Identifier.Numeric(1);
var request = TopicFactory.CreateTopicRequest();
// Act
var result = TcpContracts.CreateTopic(streamId, request).AsSpan();
// Assert
int expectedBytesLength = 2 + streamId.Length + 22 + request.Name.Length;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(streamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(streamId.Length, BytesToIdentifierNumeric(result, 0).Length);
Assert.Equal(streamId.Kind, BytesToIdentifierNumeric(result, 0).Kind);
Assert.Equal(request.TopicId, BitConverter.ToInt32(result[6..10]));
Assert.Equal(request.PartitionsCount, BitConverter.ToInt32(result[10..14]));
Assert.Equal(request.MessageExpiry, BitConverter.ToInt32(result[14..18]));
Assert.Equal(request.MaxTopicSize, BitConverter.ToUInt64(result[18..26]));
Assert.Equal(request.ReplicationFactor, (int)result[26]);
Assert.Equal(request.Name.Length, (int)result[27]);
Assert.Equal(request.Name, Encoding.UTF8.GetString(result[28..]));
}
[Fact]
public void TcpContracts_GetTopicById_HasCorrectBytes()
{
// Arrange
var streamId = Identifier.Numeric(1);
var topicId = Identifier.Numeric(1);
// Act
var result = TcpContracts.GetTopicById(streamId, topicId).AsSpan();
// Assert
int expectedBytesLength = 2 + streamId.Length + 2 + topicId.Length;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(streamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(streamId.Length, BytesToIdentifierNumeric(result, 0).Length);
Assert.Equal(streamId.Kind, BytesToIdentifierNumeric(result, 0).Kind);
Assert.Equal(topicId.Value, BytesToIdentifierNumeric(result, 6).Value);
Assert.Equal(topicId.Length, BytesToIdentifierNumeric(result, 6).Length);
Assert.Equal(topicId.Kind, BytesToIdentifierNumeric(result, 6).Kind);
}
[Fact]
public void TcpContracts_DeleteTopic_HasCorrectBytes()
{
// Arrange
var streamId = Identifier.Numeric(1);
var topicId = Identifier.Numeric(1);
// Act
var result = TcpContracts.DeleteTopic(streamId, topicId).AsSpan();
// Assert
int expectedBytesLength = 2 + streamId.Length + 2 + topicId.Length;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(streamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(streamId.Length, BytesToIdentifierNumeric(result, 0).Length);
Assert.Equal(streamId.Kind, BytesToIdentifierNumeric(result, 0).Kind);
Assert.Equal(topicId.Value, BytesToIdentifierNumeric(result, 6).Value);
Assert.Equal(topicId.Length, BytesToIdentifierNumeric(result, 6).Length);
Assert.Equal(topicId.Kind, BytesToIdentifierNumeric(result, 6).Kind);
}
[Fact]
public void TcpContracts_UpdateOffset_HasCorrectBytes()
{
// Arrange
var contract = OffsetFactory.CreateOffsetContract();
// Act
var result = TcpContracts.UpdateOffset(contract).AsSpan();
// Assert
int expectedBytesLength = 2 + contract.StreamId.Length + 2 + contract.TopicId.Length + 5 + 2 + contract.Consumer.Id.Length + 8;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(1, result[0]);
Assert.Equal(contract.Consumer.Id.Kind.GetByte(), result[1]);
Assert.Equal(contract.StreamId.Value, BytesToIdentifierNumeric(result, 7).Value);
Assert.Equal(contract.StreamId.Length, BytesToIdentifierNumeric(result, 7).Length);
Assert.Equal(contract.StreamId.Kind, BytesToIdentifierNumeric(result, 7).Kind);
Assert.Equal(contract.TopicId.Value, BytesToIdentifierNumeric(result, 13).Value);
Assert.Equal(contract.TopicId.Length, BytesToIdentifierNumeric(result, 13).Length);
Assert.Equal(contract.TopicId.Kind, BytesToIdentifierNumeric(result, 13).Kind);
Assert.Equal(contract.PartitionId, BitConverter.ToInt32(result[19..23]));
Assert.Equal(contract.Offset, BitConverter.ToUInt64(result[23..31]));
}
[Fact]
public void TcpContracts_GetOffset_HasCorrectBytes()
{
// Arrange
var request = OffsetFactory.CreateOffsetRequest();
// Act
var result = TcpContracts.GetOffset(request).AsSpan();
// Assert
int expectedBytesLength = 2 + request.StreamId.Length + 2 + request.TopicId.Length + 5 + 2 + request.Consumer.Id.Length;
Assert.Equal(expectedBytesLength, result.Length);
Assert.Equal(1, result[0]);
Assert.Equal(request.Consumer.Id.Kind.GetByte(), result[1]);
Assert.Equal(request.StreamId.Value, BytesToIdentifierNumeric(result, 7).Value);
Assert.Equal(request.StreamId.Length, BytesToIdentifierNumeric(result, 7).Length);
Assert.Equal(request.StreamId.Kind, BytesToIdentifierNumeric(result, 7).Kind);
Assert.Equal(request.TopicId.Value, BytesToIdentifierNumeric(result, 13).Value);
Assert.Equal(request.TopicId.Length, BytesToIdentifierNumeric(result, 13).Length);
Assert.Equal(request.TopicId.Kind, BytesToIdentifierNumeric(result, 13).Kind);
Assert.Equal(request.PartitionId, BitConverter.ToInt32(result[19..23]));
}
[Fact]
public void TcpContracts_CreatePartitions_HasCorrectBytes()
{
// Arrange
var request = PartitionFactory.CreatePartitionsRequest();
// Act
var result = TcpContracts.CreatePartitions(request).AsSpan();
// Assert
Assert.Equal(request.StreamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(request.StreamId.Length, BytesToIdentifierNumeric(result, 0).Length);
Assert.Equal(request.StreamId.Kind, BytesToIdentifierNumeric(result, 0).Kind);
Assert.Equal(request.TopicId.Value, BytesToIdentifierNumeric(result, 6).Value);
Assert.Equal(request.TopicId.Length, BytesToIdentifierNumeric(result, 6).Length);
Assert.Equal(request.TopicId.Kind, BytesToIdentifierNumeric(result, 6).Kind);
Assert.Equal(request.PartitionsCount, BitConverter.ToInt32(result[12..16]));
}
[Fact]
public void TcpContracts_DeletePartitions_HasCorrectBytes()
{
// Arrange
var request = PartitionFactory.CreateDeletePartitionsRequest();
// Act
var result = TcpContracts.DeletePartitions(request).AsSpan();
// Assert
Assert.Equal(request.StreamId.Value, BytesToIdentifierNumeric(result, 0).Value);
Assert.Equal(request.StreamId.Length, BytesToIdentifierNumeric(result, 0).Length);
Assert.Equal(request.StreamId.Kind, BytesToIdentifierNumeric(result, 0).Kind);
Assert.Equal(request.TopicId.Value, BytesToIdentifierNumeric(result, 6).Value);
Assert.Equal(request.TopicId.Length, BytesToIdentifierNumeric(result, 6).Length);
Assert.Equal(request.TopicId.Kind, BytesToIdentifierNumeric(result, 6).Kind);
Assert.Equal(request.PartitionsCount, BitConverter.ToInt32(result[12..16]));
}
private static Identifier BytesToIdentifierNumeric(Span<byte> bytes, int startPos)
{
var idKind = bytes[startPos] switch
{
1 => IdKind.Numeric,
2 => IdKind.String,
_ => throw new ArgumentOutOfRangeException()
};
var identifierLength = (int)bytes[startPos + 1];
var valueBytes = new byte[identifierLength];
for (int i = 0; i < identifierLength; i++)
{
valueBytes[i] = bytes[i + startPos + 2];
}
return new Identifier
{
Kind = IdKind.Numeric,
Length = identifierLength,
Value = valueBytes
};
}
private static Identifier BytesToIdentifierString(Span<byte> bytes, int startPos)
{
var idKind = bytes[startPos] switch
{
1 => IdKind.Numeric,
2 => IdKind.String,
_ => throw new ArgumentOutOfRangeException()
};
var identifierLength = (int)bytes[startPos + 1];
var valueBytes = new byte[identifierLength];
for (int i = 0; i < identifierLength; i++)
{
valueBytes[i] = bytes[i + startPos + 2];
}
return new Identifier
{
Kind = IdKind.String,
Length = identifierLength,
Value = valueBytes
};
}
private static void WriteBytesFromStreamAndTopicIdToSpan(Identifier streamId, Identifier topicId, Span<byte> bytes, int startPos = 0)
{
bytes[startPos] = streamId.Kind switch
{
IdKind.Numeric => 1,
IdKind.String => 2,
_ => throw new ArgumentOutOfRangeException()
};
bytes[startPos + 1] = (byte)streamId.Length;
for (int i = 0; i < streamId.Length; i++)
{
bytes[i + startPos + 2] = streamId.Value[i];
}
int position = startPos + 2 + streamId.Length;
bytes[position] = topicId.Kind switch
{
IdKind.Numeric => 1,
IdKind.String => 2,
_ => throw new ArgumentOutOfRangeException()
};
bytes[position + 1] = (byte)topicId.Length;
for (int i = 0; i < topicId.Length; i++)
{
bytes[i + position + 2] = topicId.Value[i];
}
}
}