foreign/csharp/Iggy_SDK/IggyClient/Implementations/TcpMessageStream.cs (968 lines of code) (raw):
using Iggy_SDK.Configuration;
using Iggy_SDK.ConnectionStream;
using Iggy_SDK.Contracts.Http;
using Iggy_SDK.Contracts.Http.Auth;
using Iggy_SDK.Contracts.Tcp;
using Iggy_SDK.Enums;
using Iggy_SDK.Exceptions;
using Iggy_SDK.Headers;
using Iggy_SDK.Kinds;
using Iggy_SDK.Mappers;
using Iggy_SDK.Messages;
using Iggy_SDK.MessagesDispatcher;
using Iggy_SDK.Utils;
using Microsoft.Extensions.Logging;
using System.Buffers;
using System.Buffers.Binary;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Channels;
namespace Iggy_SDK.IggyClient.Implementations;
public sealed class TcpMessageStream : IIggyClient, IDisposable
{
private readonly IConnectionStream _stream;
private readonly Channel<MessageSendRequest>? _channel;
private readonly MessagePollingSettings _messagePollingSettings;
private readonly ILogger<TcpMessageStream> _logger;
private readonly IMessageInvoker? _messageInvoker;
internal TcpMessageStream(IConnectionStream stream, Channel<MessageSendRequest>? channel,
MessagePollingSettings messagePollingSettings, ILoggerFactory loggerFactory,
IMessageInvoker? messageInvoker = null)
{
_stream = stream;
_channel = channel;
_messagePollingSettings = messagePollingSettings;
_messageInvoker = messageInvoker;
_logger = loggerFactory.CreateLogger<TcpMessageStream>();
}
public async Task CreateStreamAsync(StreamRequest request, CancellationToken token = default)
{
var message = TcpContracts.CreateStream(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_STREAM_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task<StreamResponse?> GetStreamByIdAsync(Identifier streamId, CancellationToken token = default)
{
var message = TcpMessageStreamHelpers.GetBytesFromIdentifier(streamId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_STREAM_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return null;
}
return BinaryMapper.MapStream(responseBuffer);
}
public async Task UpdateStreamAsync(Identifier streamId, UpdateStreamRequest request, CancellationToken token = default)
{
var message = TcpContracts.UpdateStream(streamId, request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_STREAM_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task<IReadOnlyList<StreamResponse>> GetStreamsAsync(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_STREAMS_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return Array.Empty<StreamResponse>();
}
await _stream.ReadAsync(responseBuffer, token);
return BinaryMapper.MapStreams(responseBuffer);
}
public async Task DeleteStreamAsync(Identifier streamId, CancellationToken token = default)
{
var message = TcpMessageStreamHelpers.GetBytesFromIdentifier(streamId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_STREAM_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task<IReadOnlyList<TopicResponse>> GetTopicsAsync(Identifier streamId, CancellationToken token = default)
{
var message = TcpMessageStreamHelpers.GetBytesFromIdentifier(streamId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_TOPICS_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
throw new InvalidResponseException($"Invalid response status code: {response.Status}");
}
if (response.Length <= 1)
{
return Array.Empty<TopicResponse>();
}
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
return BinaryMapper.MapTopics(responseBuffer);
}
public async Task<TopicResponse?> GetTopicByIdAsync(Identifier streamId, Identifier topicId, CancellationToken token = default)
{
var message = TcpContracts.GetTopicById(streamId, topicId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_TOPIC_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return null;
}
return BinaryMapper.MapTopic(responseBuffer);
}
public async Task CreateTopicAsync(Identifier streamId, TopicRequest topic, CancellationToken token = default)
{
var message = TcpContracts.CreateTopic(streamId, topic);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_TOPIC_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task UpdateTopicAsync(Identifier streamId, Identifier topicId, UpdateTopicRequest request, CancellationToken token = default)
{
var message = TcpContracts.UpdateTopic(streamId, topicId, request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_TOPIC_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task DeleteTopicAsync(Identifier streamId, Identifier topicId, CancellationToken token = default)
{
var message = TcpContracts.DeleteTopic(streamId, topicId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_TOPIC_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task SendMessagesAsync(MessageSendRequest request,
Func<byte[], byte[]>? encryptor = null,
CancellationToken token = default)
{
if (request.Messages.Count == 0)
{
return;
}
//TODO - explore making fields of Message class mutable, so there is no need to create em from scratch
if (encryptor is not null)
{
for (var i = 0; i < request.Messages.Count || token.IsCancellationRequested; i++)
{
request.Messages[i] = request.Messages[i] with { Payload = encryptor(request.Messages[i].Payload) };
}
}
if (_messageInvoker is not null)
{
await _messageInvoker.SendMessagesAsync(request, token);
return;
}
await _channel!.Writer.WriteAsync(request, token);
}
public async Task SendMessagesAsync<TMessage>(MessageSendRequest<TMessage> request,
Func<TMessage, byte[]> serializer,
Func<byte[], byte[]>? encryptor = null, Dictionary<HeaderKey, HeaderValue>? headers = null,
CancellationToken token = default)
{
var messages = request.Messages;
if (messages.Count == 0)
{
return;
}
//TODO - explore making fields of Message class mutable, so there is no need to create em from scratch
var messagesBuffer = new Message[messages.Count];
for (var i = 0; i < messages.Count || token.IsCancellationRequested; i++)
{
messagesBuffer[i] = new Message
{
Payload = encryptor is not null ? encryptor(serializer(messages[i])) : serializer(messages[i]),
Headers = headers,
Id = Guid.NewGuid()
};
}
var sendRequest = new MessageSendRequest
{
StreamId = request.StreamId,
TopicId = request.TopicId,
Partitioning = request.Partitioning,
Messages = messagesBuffer
};
if (_messageInvoker is not null)
{
await _messageInvoker.SendMessagesAsync(sendRequest, token);
return;
}
await _channel!.Writer.WriteAsync(sendRequest, token);
}
public async Task<PolledMessages<TMessage>> FetchMessagesAsync<TMessage>(MessageFetchRequest request,
Func<byte[], TMessage> serializer, Func<byte[], byte[]>? decryptor = null, CancellationToken token = default)
{
await SendFetchMessagesRequestPayload(request, token);
var buffer = MemoryPool<byte>.Shared.Rent(BufferSizes.ExpectedResponseSize);
try
{
await _stream.ReadAsync(buffer.Memory[..BufferSizes.ExpectedResponseSize], token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer.Memory.Span);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
if (response.Length <= 1)
{
return PolledMessages<TMessage>.Empty;
}
var responseBuffer = MemoryPool<byte>.Shared.Rent(response.Length);
try
{
await _stream.ReadAsync(responseBuffer.Memory[..response.Length], token);
var result = BinaryMapper.MapMessages(
responseBuffer.Memory.Span[..response.Length], serializer, decryptor);
return result;
}
finally
{
responseBuffer.Dispose();
}
}
finally
{
buffer.Dispose();
}
}
public async IAsyncEnumerable<MessageResponse<TMessage>> PollMessagesAsync<TMessage>(PollMessagesRequest request,
Func<byte[], TMessage> deserializer, Func<byte[], byte[]>? decryptor = null,
[EnumeratorCancellation] CancellationToken token = default)
{
var channel = Channel.CreateUnbounded<MessageResponse<TMessage>>();
var autoCommit = _messagePollingSettings.StoreOffsetStrategy switch
{
StoreOffset.Never => false,
StoreOffset.WhenMessagesAreReceived => true,
StoreOffset.AfterProcessingEachMessage => false,
_ => throw new ArgumentOutOfRangeException()
};
var fetchRequest = new MessageFetchRequest
{
Consumer = request.Consumer,
StreamId = request.StreamId,
TopicId = request.TopicId,
AutoCommit = autoCommit,
Count = request.Count,
PartitionId = request.PartitionId,
PollingStrategy = request.PollingStrategy
};
_ = StartPollingMessagesAsync(fetchRequest, deserializer, _messagePollingSettings.Interval, channel.Writer, decryptor, token);
await foreach(var messageResponse in channel.Reader.ReadAllAsync(token))
{
yield return messageResponse;
var currentOffset = messageResponse.Offset;
if (_messagePollingSettings.StoreOffsetStrategy is StoreOffset.AfterProcessingEachMessage)
{
var storeOffsetRequest = new StoreOffsetRequest
{
Consumer = request.Consumer,
Offset = currentOffset,
PartitionId = request.PartitionId,
StreamId = request.StreamId,
TopicId = request.TopicId
};
try
{
await StoreOffsetAsync(storeOffsetRequest, token);
}
catch
{
_logger.LogError("Error encountered while saving offset information - Offset: {offset}, Stream ID: {streamId}, Topic ID: {topicId}, Partition ID: {partitionId}",
currentOffset, request.StreamId, request.TopicId, request.PartitionId);
}
}
if (request.PollingStrategy.Kind is MessagePolling.Offset)
{
//TODO - check with profiler whether this doesn't cause a lot of allocations
request.PollingStrategy = PollingStrategy.Offset(currentOffset + 1);
}
}
}
//TODO - look into calling the non generic FetchMessagesAsync method in order
//to make this method re-usable for non generic PollMessages method.
private async Task StartPollingMessagesAsync<TMessage>(MessageFetchRequest request,
Func<byte[], TMessage> deserializer, TimeSpan interval, ChannelWriter<MessageResponse<TMessage>> writer,
Func<byte[], byte[]>? decryptor = null,
CancellationToken token = default)
{
var timer = new PeriodicTimer(interval);
while (await timer.WaitForNextTickAsync(token) || token.IsCancellationRequested)
{
try
{
var fetchResponse = await FetchMessagesAsync(request, deserializer, decryptor, token);
if (fetchResponse.Messages.Count == 0)
{
continue;
}
foreach (var messageResponse in fetchResponse.Messages)
{
await writer.WriteAsync(messageResponse, token);
}
}
catch(InvalidResponseException e)
{
_logger.LogError("Error encountered while polling messages - Stream ID: {streamId}, Topic ID: {topicId}, Partition ID: {partitionId}, error message {message}",
request.StreamId, request.TopicId, request.PartitionId, e.Message);
}
}
writer.Complete();
}
public async Task<PolledMessages> FetchMessagesAsync(MessageFetchRequest request,
Func<byte[], byte[]>? decryptor = null, CancellationToken token = default)
{
await SendFetchMessagesRequestPayload(request, token);
var buffer = ArrayPool<byte>.Shared.Rent(BufferSizes.ExpectedResponseSize);
try
{
await _stream.ReadAsync(buffer.AsMemory()[..BufferSizes.ExpectedResponseSize], token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
if (response.Length <= 1)
{
return PolledMessages.Empty;
}
var responseBuffer = ArrayPool<byte>.Shared.Rent(response.Length);
try
{
await _stream.ReadAsync(responseBuffer.AsMemory()[..response.Length], token);
var result = BinaryMapper.MapMessages(
responseBuffer.AsSpan()[..response.Length], decryptor);
return result;
}
finally
{
ArrayPool<byte>.Shared.Return(responseBuffer);
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
private async Task SendFetchMessagesRequestPayload(MessageFetchRequest request, CancellationToken token)
{
var messageBufferSize = CalculateMessageBufferSize(request);
var payloadBufferSize = CalculatePayloadBufferSize(messageBufferSize);
var message = ArrayPool<byte>.Shared.Rent(messageBufferSize);
var payload = ArrayPool<byte>.Shared.Rent(payloadBufferSize);
try
{
TcpContracts.GetMessages(message.AsSpan()[..messageBufferSize], request);
TcpMessageStreamHelpers.CreatePayload(payload, message.AsSpan()[..messageBufferSize], CommandCodes.POLL_MESSAGES_CODE);
await _stream.SendAsync(payload.AsMemory()[..payloadBufferSize], token);
}
finally
{
ArrayPool<byte>.Shared.Return(message);
ArrayPool<byte>.Shared.Return(payload);
}
}
private static int CalculatePayloadBufferSize(int messageBufferSize)
=> messageBufferSize + 4 + BufferSizes.InitialBytesLength;
private static int CalculateMessageBufferSize(MessageFetchRequest request)
=> 14 + 5 + 2 + request.StreamId.Length + 2 + request.TopicId.Length + 2 + request.Consumer.Id.Length;
public async Task StoreOffsetAsync(StoreOffsetRequest request, CancellationToken token = default)
{
var message = TcpContracts.UpdateOffset(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.STORE_CONSUMER_OFFSET_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task<OffsetResponse?> GetOffsetAsync(OffsetRequest request, CancellationToken token = default)
{
var message = TcpContracts.GetOffset(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CONSUMER_OFFSET_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return null;
}
return BinaryMapper.MapOffsets(responseBuffer);
}
public async Task<IReadOnlyList<ConsumerGroupResponse>> GetConsumerGroupsAsync(Identifier streamId, Identifier topicId,
CancellationToken token = default)
{
var message = TcpContracts.GetGroups(streamId, topicId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CONSUMER_GROUP_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return Array.Empty<ConsumerGroupResponse>();
}
return BinaryMapper.MapConsumerGroups(responseBuffer);
}
public async Task<ConsumerGroupResponse?> GetConsumerGroupByIdAsync(Identifier streamId, Identifier topicId,
Identifier groupId, CancellationToken token = default)
{
var message = TcpContracts.GetGroup(streamId, topicId, groupId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CONSUMER_GROUP_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return null;
}
return BinaryMapper.MapConsumerGroup(responseBuffer);
}
public async Task CreateConsumerGroupAsync(CreateConsumerGroupRequest request, CancellationToken token = default)
{
var message = TcpContracts.CreateGroup(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_CONSUMER_GROUP_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task DeleteConsumerGroupAsync(DeleteConsumerGroupRequest request, CancellationToken token = default)
{
var message = TcpContracts.DeleteGroup(request.StreamId, request.TopicId, request.ConsumerGroupId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_CONSUMER_GROUP_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task JoinConsumerGroupAsync(JoinConsumerGroupRequest request, CancellationToken token = default)
{
var message = TcpContracts.JoinGroup(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.JOIN_CONSUMER_GROUP_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task LeaveConsumerGroupAsync(LeaveConsumerGroupRequest request, CancellationToken token = default)
{
var message = TcpContracts.LeaveGroup(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LEAVE_CONSUMER_GROUP_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task DeletePartitionsAsync(DeletePartitionsRequest request,
CancellationToken token = default)
{
var message = TcpContracts.DeletePartitions(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_PARTITIONS_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task CreatePartitionsAsync(CreatePartitionsRequest request,
CancellationToken token = default)
{
var message = TcpContracts.CreatePartitions(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_PARTITIONS_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task<Stats?> GetStatsAsync(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_STATS_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return null;
}
return BinaryMapper.MapStats(responseBuffer);
}
public async Task<IReadOnlyList<ClientResponse>> GetClientsAsync(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CLIENTS_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return Array.Empty<ClientResponse>();
}
return BinaryMapper.MapClients(responseBuffer);
}
public async Task<ClientResponse?> GetClientByIdAsync(uint clientId, CancellationToken token = default)
{
var message = TcpContracts.GetClient(clientId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_CLIENT_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return null;
}
return BinaryMapper.MapClient(responseBuffer);
}
public void Dispose()
{
_stream.Close();
_stream.Dispose();
}
public async Task<UserResponse?> GetUser(Identifier userId, CancellationToken token = default)
{
var message = TcpContracts.GetUser(userId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_USER_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return null;
}
return BinaryMapper.MapUser(responseBuffer);
}
public async Task<IReadOnlyList<UserResponse>> GetUsers(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_USERS_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
return BinaryMapper.MapUsers(responseBuffer);
}
public async Task CreateUser(CreateUserRequest request, CancellationToken token = default)
{
var message = TcpContracts.CreateUser(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_USER_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
var result = new byte[response.Length];
// TODO: CreateUser returns information about created user (same class as GetUser).
// Implement this aswell.
await _stream.ReadAsync(result, token);
}
public async Task DeleteUser(Identifier userId, CancellationToken token = default)
{
var message = TcpContracts.DeleteUser(userId);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_USER_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task UpdateUser(UpdateUserRequest request, CancellationToken token = default)
{
var message = TcpContracts.UpdateUser(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_USER_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task UpdatePermissions(UpdateUserPermissionsRequest request, CancellationToken token = default)
{
var message = TcpContracts.UpdatePermissions(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.UPDATE_PERMISSIONS_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task ChangePassword(ChangePasswordRequest request, CancellationToken token = default)
{
var message = TcpContracts.ChangePassword(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CHANGE_PASSWORD_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task<AuthResponse?> LoginUser(LoginUserRequest request, CancellationToken token = default)
{
var message = TcpContracts.LoginUser(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGIN_USER_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
// TODO: maybe refactor later, for now static 12
var buffer = new byte[12];
//await _socket.ReceiveAsync(buffer, token);
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
if (response.Length <= 1)
{
return null;
}
var userId = BinaryPrimitives.ReadInt32LittleEndian(buffer.AsSpan()[8..(8 + response.Length)]);
//TODO: Figure out how to solve this workaround about default of TokenInfo
return new AuthResponse(userId, default);
}
public async Task LogoutUser(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGOUT_USER_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task<IReadOnlyList<PersonalAccessTokenResponse>> GetPersonalAccessTokensAsync(CancellationToken token = default)
{
var message = Array.Empty<byte>();
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.GET_PERSONAL_ACCESS_TOKENS_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
return BinaryMapper.MapPersonalAccessTokens(responseBuffer);
}
public async Task<RawPersonalAccessToken?> CreatePersonalAccessTokenAsync(CreatePersonalAccessTokenRequest request, CancellationToken token = default)
{
var message = TcpContracts.CreatePersonalAccessToken(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.CREATE_PERSONAL_ACCESS_TOKEN_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return null;
}
return BinaryMapper.MapRawPersonalAccessToken(responseBuffer);
}
public async Task DeletePersonalAccessTokenAsync(DeletePersonalAccessTokenRequest request, CancellationToken token = default)
{
var message = TcpContracts.DeletePersonalRequestToken(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.DELETE_PERSONAL_ACCESS_TOKEN_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[BufferSizes.ExpectedResponseSize];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
if (response.Status != 0)
{
var errorBuffer = new byte[response.Length];
await _stream.ReadAsync(errorBuffer, token);
throw new InvalidResponseException(Encoding.UTF8.GetString(errorBuffer));
}
}
public async Task<AuthResponse?> LoginWithPersonalAccessToken(LoginWithPersonalAccessToken request, CancellationToken token = default)
{
var message = TcpContracts.LoginWithPersonalAccessToken(request);
var payload = new byte[4 + BufferSizes.InitialBytesLength + message.Length];
TcpMessageStreamHelpers.CreatePayload(payload, message, CommandCodes.LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE);
await _stream.SendAsync(payload, token);
await _stream.FlushAsync(token);
var buffer = new byte[8];
await _stream.ReadAsync(buffer, token);
var response = TcpMessageStreamHelpers.GetResponseLengthAndStatus(buffer);
var responseBuffer = new byte[response.Length];
await _stream.ReadAsync(responseBuffer, token);
if (response.Status != 0)
{
throw new InvalidResponseException(Encoding.UTF8.GetString(responseBuffer));
}
if (response.Length <= 1)
{
return null;
}
var userId = BinaryPrimitives.ReadInt32LittleEndian(responseBuffer.AsSpan()[..4]);
//TODO: Figure out how to solve this workaround about default of TokenInfo
return new AuthResponse(userId, default);
}
}