foreign/csharp/Iggy_SDK/IggyClient/Implementations/HttpMessageStream.cs (589 lines of code) (raw):

using Iggy_SDK.Configuration; using Iggy_SDK.Contracts.Http; using Iggy_SDK.Contracts.Http.Auth; using Iggy_SDK.Enums; using Iggy_SDK.Exceptions; using Iggy_SDK.Headers; using Iggy_SDK.JsonConfiguration; using Iggy_SDK.Kinds; using Iggy_SDK.Messages; using Iggy_SDK.MessagesDispatcher; using Iggy_SDK.StringHandlers; using Microsoft.Extensions.Logging; using System.Buffers.Binary; using System.Net; using System.Net.Http.Headers; using System.Net.Http.Json; using System.Runtime.CompilerServices; using System.Text; using System.Text.Json; using System.Threading.Channels; namespace Iggy_SDK.IggyClient.Implementations; public class HttpMessageStream : IIggyClient { //TODO - create mechanism for refreshing jwt token //TODO - replace the HttpClient with IHttpClientFactory, when implementing support for ASP.NET Core DI //TODO - the error handling pattern is pretty ugly, look into moving it into an extension method private readonly HttpClient _httpClient; private readonly Channel<MessageSendRequest>? _channel; private readonly MessagePollingSettings _messagePollingSettings; private readonly ILogger<HttpMessageStream> _logger; private readonly IMessageInvoker? _messageInvoker; internal HttpMessageStream(HttpClient httpClient, Channel<MessageSendRequest>? channel, MessagePollingSettings messagePollingSettings, ILoggerFactory loggerFactory, IMessageInvoker? messageInvoker = null) { _httpClient = httpClient; _channel = channel; _messagePollingSettings = messagePollingSettings; _messageInvoker = messageInvoker; _logger = loggerFactory.CreateLogger<HttpMessageStream>(); } public async Task CreateStreamAsync(StreamRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PostAsync("/streams", data, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task DeleteStreamAsync(Identifier streamId, CancellationToken token = default) { var response = await _httpClient.DeleteAsync($"/streams/{streamId}", token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task<StreamResponse?> GetStreamByIdAsync(Identifier streamId, CancellationToken token = default) { var response = await _httpClient.GetAsync($"/streams/{streamId}", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<StreamResponse>(JsonConverterFactory.StreamResponseOptions, cancellationToken: token); } await HandleResponseAsync(response); return null; } public async Task UpdateStreamAsync(Identifier streamId, UpdateStreamRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PutAsync($"/streams/{streamId}", data, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task<IReadOnlyList<StreamResponse>> GetStreamsAsync(CancellationToken token = default) { var response = await _httpClient.GetAsync($"/streams", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<IReadOnlyList<StreamResponse>>(JsonConverterFactory.StreamResponseOptions, cancellationToken: token) ?? Array.Empty<StreamResponse>(); } await HandleResponseAsync(response); return Array.Empty<StreamResponse>(); } public async Task CreateTopicAsync(Identifier streamId, TopicRequest topic, CancellationToken token = default) { var json = JsonSerializer.Serialize(topic, JsonConverterFactory.CreateTopicOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PostAsync($"/streams/{streamId}/topics", data, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task UpdateTopicAsync(Identifier streamId, Identifier topicId, UpdateTopicRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PutAsync($"/streams/{streamId}/topics/{topicId}", data, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task DeleteTopicAsync(Identifier streamId, Identifier topicId, CancellationToken token = default) { var response = await _httpClient.DeleteAsync($"/streams/{streamId}/topics/{topicId}", token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task<IReadOnlyList<TopicResponse>> GetTopicsAsync(Identifier streamId, CancellationToken token = default) { var response = await _httpClient.GetAsync($"/streams/{streamId}/topics", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<IReadOnlyList<TopicResponse>>(JsonConverterFactory.TopicResponseOptions, cancellationToken: token) ?? Array.Empty<TopicResponse>(); } await HandleResponseAsync(response); return Array.Empty<TopicResponse>(); } public async Task<TopicResponse?> GetTopicByIdAsync(Identifier streamId, Identifier topicId, CancellationToken token = default) { var response = await _httpClient.GetAsync($"/streams/{streamId}/topics/{topicId}", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<TopicResponse>(JsonConverterFactory.TopicResponseOptions, cancellationToken: token); } await HandleResponseAsync(response); return null; } public async Task SendMessagesAsync(MessageSendRequest request, Func<byte[], byte[]>? encryptor = null, CancellationToken token = default) { if (encryptor is not null) { for (var i = 0; i < request.Messages.Count; i++) { request.Messages[i] = request.Messages[i] with { Payload = encryptor(request.Messages[i].Payload) }; } } if (_messageInvoker is not null) { await _messageInvoker.SendMessagesAsync(request, token); return; } await _channel!.Writer.WriteAsync(request, token); } public async Task SendMessagesAsync<TMessage>(MessageSendRequest<TMessage> request, Func<TMessage, byte[]> serializer, Func<byte[], byte[]>? encryptor = null, Dictionary<HeaderKey, HeaderValue>? headers = null, CancellationToken token = default) { var messages = request.Messages; //TODO - maybe get rid of this closure ? var sendRequest = new MessageSendRequest { StreamId = request.StreamId, TopicId = request.TopicId, Partitioning = request.Partitioning, Messages = messages.Select(message => new Message { Id = Guid.NewGuid(), Headers = headers, Payload = encryptor is not null ? encryptor(serializer(message)) : serializer(message), }).ToArray() }; if (_messageInvoker is not null) { try { await _messageInvoker.SendMessagesAsync(sendRequest, token); } catch { var partId = BinaryPrimitives.ReadInt32LittleEndian(sendRequest.Partitioning.Value); _logger.LogError("Error encountered while sending messages - Stream ID:{streamId}, Topic ID:{topicId}, Partition ID: {partitionId}", sendRequest.StreamId, sendRequest.TopicId, partId); } return; } await _channel!.Writer.WriteAsync(sendRequest, token); } public async Task<PolledMessages> FetchMessagesAsync(MessageFetchRequest request, Func<byte[], byte[]>? decryptor = null, CancellationToken token = default) { var url = CreateUrl($"/streams/{request.StreamId}/topics/{request.TopicId}/messages?consumer_id={request.Consumer.Id}" + $"&partition_id={request.PartitionId}&kind={request.PollingStrategy.Kind}&value={request.PollingStrategy.Value}&count={request.Count}&auto_commit={request.AutoCommit}"); var response = await _httpClient.GetAsync(url, token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<PolledMessages>(JsonConverterFactory.MessageResponseOptions(decryptor), cancellationToken: token) ?? PolledMessages.Empty; } await HandleResponseAsync(response); return PolledMessages.Empty; } public async Task<PolledMessages<TMessage>> FetchMessagesAsync<TMessage>(MessageFetchRequest request, Func<byte[], TMessage> serializer, Func<byte[], byte[]>? decryptor = null, CancellationToken token = default) { var url = CreateUrl($"/streams/{request.StreamId}/topics/{request.TopicId}/messages?consumer_id={request.Consumer.Id}" + $"&partition_id={request.PartitionId}&kind={request.PollingStrategy.Kind}&value={request.PollingStrategy.Value}&count={request.Count}&auto_commit={request.AutoCommit}"); var response = await _httpClient.GetAsync(url, token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<PolledMessages<TMessage>>(JsonConverterFactory.MessageResponseGenericOptions(serializer, decryptor), cancellationToken: token) ?? PolledMessages<TMessage>.Empty; } await HandleResponseAsync(response); return PolledMessages<TMessage>.Empty; } public async IAsyncEnumerable<MessageResponse<TMessage>> PollMessagesAsync<TMessage>(PollMessagesRequest request, Func<byte[], TMessage> deserializer, Func<byte[], byte[]>? decryptor = null, [EnumeratorCancellation] CancellationToken token = default) { var channel = Channel.CreateUnbounded<MessageResponse<TMessage>>(); var autoCommit = _messagePollingSettings.StoreOffsetStrategy switch { StoreOffset.Never => false, StoreOffset.WhenMessagesAreReceived => true, StoreOffset.AfterProcessingEachMessage => false, _ => throw new ArgumentOutOfRangeException() }; var fetchRequest = new MessageFetchRequest { Consumer = request.Consumer, StreamId = request.StreamId, TopicId = request.TopicId, AutoCommit = autoCommit, Count = request.Count, PartitionId = request.PartitionId, PollingStrategy = request.PollingStrategy }; _ = StartPollingMessagesAsync(fetchRequest, deserializer, _messagePollingSettings.Interval, channel.Writer, decryptor, token); await foreach(var messageResponse in channel.Reader.ReadAllAsync(token)) { yield return messageResponse; var currentOffset = messageResponse.Offset; if (_messagePollingSettings.StoreOffsetStrategy is StoreOffset.AfterProcessingEachMessage) { var storeOffsetRequest = new StoreOffsetRequest { Consumer = request.Consumer, Offset = currentOffset, PartitionId = request.PartitionId, StreamId = request.StreamId, TopicId = request.TopicId }; try { await StoreOffsetAsync(storeOffsetRequest, token); } catch { _logger.LogError("Error encountered while saving offset information - Offset: {offset}, Stream ID: {streamId}, Topic ID: {topicId}, Partition ID: {partitionId}", currentOffset, request.StreamId, request.TopicId, request.PartitionId); } } if (request.PollingStrategy.Kind is MessagePolling.Offset) { //TODO - check with profiler whether this doesn't cause a lot of allocations request.PollingStrategy = PollingStrategy.Offset(currentOffset + 1); } } } private async Task StartPollingMessagesAsync<TMessage>(MessageFetchRequest request, Func<byte[], TMessage> deserializer, TimeSpan interval, ChannelWriter<MessageResponse<TMessage>> writer, Func<byte[], byte[]>? decryptor = null, CancellationToken token = default) { var timer = new PeriodicTimer(interval); while (await timer.WaitForNextTickAsync(token) || token.IsCancellationRequested) { try { var fetchResponse = await FetchMessagesAsync(request, deserializer, decryptor, token); if (fetchResponse.Messages.Count == 0) { continue; } foreach (var messageResponse in fetchResponse.Messages) { await writer.WriteAsync(messageResponse, token); } } catch(Exception e) { _logger.LogError("Error encountered while polling messages - Stream ID: {streamId}, Topic ID: {topicId}, Partition ID: {partitionId}", request.StreamId, request.TopicId, request.PartitionId); } } writer.Complete(); } public async Task StoreOffsetAsync(StoreOffsetRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PutAsync($"/streams/{request.StreamId}/topics/{request.TopicId}/consumer-offsets", data, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task<OffsetResponse?> GetOffsetAsync(OffsetRequest request, CancellationToken token = default) { var response = await _httpClient.GetAsync($"/streams/{request.StreamId}/topics/{request.TopicId}/" + $"consumer-offsets?consumer_id={request.Consumer.Id}&partition_id={request.PartitionId}", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<OffsetResponse>(JsonConverterFactory.SnakeCaseOptions, cancellationToken: token); } await HandleResponseAsync(response); return null; } public async Task<IReadOnlyList<ConsumerGroupResponse>> GetConsumerGroupsAsync(Identifier streamId, Identifier topicId, CancellationToken token = default) { var response = await _httpClient.GetAsync($"/streams/{streamId}/topics/{topicId}/consumer-groups", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<IReadOnlyList<ConsumerGroupResponse>>(JsonConverterFactory.SnakeCaseOptions, cancellationToken: token) ?? Array.Empty<ConsumerGroupResponse>(); } await HandleResponseAsync(response); return Array.Empty<ConsumerGroupResponse>(); } public async Task<ConsumerGroupResponse?> GetConsumerGroupByIdAsync(Identifier streamId, Identifier topicId, Identifier groupId, CancellationToken token = default) { var response = await _httpClient.GetAsync($"/streams/{streamId}/topics/{topicId}/consumer-groups/{groupId}", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<ConsumerGroupResponse>(JsonConverterFactory.SnakeCaseOptions, cancellationToken: token); } await HandleResponseAsync(response); return null; } public async Task CreateConsumerGroupAsync(CreateConsumerGroupRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PostAsync($"/streams/{request.StreamId}/topics/{request.TopicId}/consumer-groups", data, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task DeleteConsumerGroupAsync(DeleteConsumerGroupRequest request, CancellationToken token = default) { var response = await _httpClient.DeleteAsync($"/streams/{request.StreamId}/topics/{request.TopicId}/consumer-groups/{request.ConsumerGroupId}", token); await HandleResponseAsync(response); } public async Task<Stats?> GetStatsAsync(CancellationToken token = default) { var response = await _httpClient.GetAsync($"/stats", token); if (response.IsSuccessStatusCode) { var result = await response.Content.ReadFromJsonAsync<StatsResponse>(JsonConverterFactory.StatsResponseOptions, cancellationToken: token); return result?.ToStats(); } await HandleResponseAsync(response); return null; } public async Task<IReadOnlyList<ClientResponse>> GetClientsAsync(CancellationToken token = default) { var response = await _httpClient.GetAsync($"/clients", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<IReadOnlyList<ClientResponse>>(JsonConverterFactory.SnakeCaseOptions, token) ?? Array.Empty<ClientResponse>(); } await HandleResponseAsync(response); return Array.Empty<ClientResponse>(); } public async Task<ClientResponse?> GetClientByIdAsync(uint clientId, CancellationToken token = default) { var response = await _httpClient.GetAsync($"/clients/{clientId}", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<ClientResponse>(JsonConverterFactory.SnakeCaseOptions, token); } await HandleResponseAsync(response); return null; } [Obsolete("This method is only supported in TCP protocol", true)] public Task JoinConsumerGroupAsync(JoinConsumerGroupRequest request, CancellationToken token = default) { throw new FeatureUnavailableException(); } [Obsolete("This method is only supported in TCP protocol", true)] public Task LeaveConsumerGroupAsync(LeaveConsumerGroupRequest request, CancellationToken token = default) { throw new FeatureUnavailableException(); } public async Task DeletePartitionsAsync(DeletePartitionsRequest request, CancellationToken token = default) { var response = await _httpClient.DeleteAsync($"/streams/{request.StreamId}/topics/{request.TopicId}/partitions?partitions_count={request.PartitionsCount}", token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task CreatePartitionsAsync(CreatePartitionsRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PostAsync($"/streams/{request.StreamId}/topics/{request.TopicId}/partitions", data, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } private static async Task HandleResponseAsync(HttpResponseMessage response) { if ((int)response.StatusCode > 300 && (int)response.StatusCode < 500) { var err = await response.Content.ReadAsStringAsync(); throw new InvalidResponseException(err); } if (response.StatusCode == HttpStatusCode.InternalServerError) { throw new Exception("Internal server error"); } } private static string CreateUrl(ref MessageRequestInterpolationHandler message) { return message.ToString(); } public async Task<UserResponse?> GetUser(Identifier userId, CancellationToken token = default) { //TODO - this doesn't work prob needs a custom json serializer var response = await _httpClient.GetAsync($"/users/{userId}", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<UserResponse>(JsonConverterFactory.SnakeCaseOptions, token); } await HandleResponseAsync(response); return null; } public async Task<IReadOnlyList<UserResponse>> GetUsers(CancellationToken token = default) { var response = await _httpClient.GetAsync("/users", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<IReadOnlyList<UserResponse>>(JsonConverterFactory.SnakeCaseOptions, token) ?? Array.Empty<UserResponse>(); } await HandleResponseAsync(response); return Array.Empty<UserResponse>(); } public async Task CreateUser(CreateUserRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var content = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PostAsync("/users", content, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task DeleteUser(Identifier userId, CancellationToken token = default) { var response = await _httpClient.DeleteAsync($"/users/{userId}", token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task UpdateUser(UpdateUserRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var content = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PutAsync($"/users/{request.UserId}", content, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task UpdatePermissions(UpdateUserPermissionsRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var content = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PutAsync($"/users/{request.UserId}/permissions", content, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task ChangePassword(ChangePasswordRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var content = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PutAsync($"/users/{request.UserId}/password", content, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task<AuthResponse?> LoginUser(LoginUserRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var data = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PostAsync("users/login", data, token); if (response.IsSuccessStatusCode) { var authResponse = await response.Content.ReadFromJsonAsync<AuthResponse>(JsonConverterFactory.AuthResponseOptions, cancellationToken: token); var jwtToken = authResponse!.AccessToken?.Token; if (!string.IsNullOrEmpty(authResponse!.AccessToken!.Token)) { _httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", jwtToken); } else { throw new Exception("The JWT token is missing."); } return authResponse; } await HandleResponseAsync(response); return null; } public async Task LogoutUser(CancellationToken token = default) { // var json = JsonSerializer.Serialize(new // { // }, JsonConverterFactory.SnakeCaseOptions); // var content = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.DeleteAsync("users/logout", token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } _httpClient.DefaultRequestHeaders.Authorization = null; } public async Task<IReadOnlyList<PersonalAccessTokenResponse>> GetPersonalAccessTokensAsync(CancellationToken token = default) { var response = await _httpClient.GetAsync("/personal-access-tokens", token); if (response.IsSuccessStatusCode) { return await response.Content.ReadFromJsonAsync<IReadOnlyList<PersonalAccessTokenResponse>>(JsonConverterFactory.PersonalAccessTokenOptions, token) ?? Array.Empty<PersonalAccessTokenResponse>(); } await HandleResponseAsync(response); return Array.Empty<PersonalAccessTokenResponse>(); } public async Task<RawPersonalAccessToken?> CreatePersonalAccessTokenAsync(CreatePersonalAccessTokenRequest request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var content = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PostAsync("/personal-access-tokens", content, token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } return await response.Content.ReadFromJsonAsync<RawPersonalAccessToken>(JsonConverterFactory.SnakeCaseOptions, token); } public async Task DeletePersonalAccessTokenAsync(DeletePersonalAccessTokenRequest request, CancellationToken token = default) { var response = await _httpClient.DeleteAsync($"/personal-access-tokens/{request.Name}", token); if (!response.IsSuccessStatusCode) { await HandleResponseAsync(response); } } public async Task<AuthResponse?> LoginWithPersonalAccessToken(LoginWithPersonalAccessToken request, CancellationToken token = default) { var json = JsonSerializer.Serialize(request, JsonConverterFactory.SnakeCaseOptions); var content = new StringContent(json, Encoding.UTF8, "application/json"); var response = await _httpClient.PostAsync("/personal-access-tokens/login", content, token); if (response.IsSuccessStatusCode) { var authResponse = await response.Content.ReadFromJsonAsync<AuthResponse>(JsonConverterFactory.AuthResponseOptions, cancellationToken: token); var jwtToken = authResponse!.AccessToken?.Token; if (!string.IsNullOrEmpty(authResponse!.AccessToken!.Token)) { _httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", jwtToken); } else { throw new Exception("The JWT token is missing."); } return authResponse; } await HandleResponseAsync(response); return null; } }