src/DotPulsar/Internal/Connection.cs (259 lines of code) (raw):

/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace DotPulsar.Internal; using DotPulsar.Abstractions; using DotPulsar.Internal.Abstractions; using DotPulsar.Internal.Exceptions; using DotPulsar.Internal.Extensions; using DotPulsar.Internal.PulsarApi; using System; using System.Buffers; using System.Threading; using System.Threading.Tasks; public sealed class Connection : IConnection { private readonly AsyncLock _lock; private readonly ChannelManager _channelManager; private readonly PingPongHandler _pingPongHandler; private readonly IPulsarStream _stream; private readonly IAuthentication? _authentication; private int _isDisposed; public Connection(IPulsarStream stream, TimeSpan keepAliveInterval, IAuthentication? authentication) { _lock = new AsyncLock(); _channelManager = new ChannelManager(); _pingPongHandler = new PingPongHandler(this, keepAliveInterval); _stream = stream; _authentication = authentication; } public async ValueTask<bool> HasChannels(CancellationToken cancellationToken) { ThrowIfDisposed(); using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { return _channelManager.HasChannels(); } } public async Task<ProducerResponse> Send(CommandProducer command, IChannel channel, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<ProducerResponse>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command, channel); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } public async Task<SubscribeResponse> Send(CommandSubscribe command, IChannel channel, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<SubscribeResponse>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command, channel); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } private async Task Send(CommandAuthResponse command, CancellationToken cancellationToken) { await Task.Yield(); if (_authentication is not null) { if (command.Response is null) command.Response = new AuthData(); command.Response.AuthMethodName = _authentication.AuthenticationMethodName; command.Response.Data = await _authentication.GetAuthenticationData(cancellationToken).ConfigureAwait(false); } await Send(command.AsBaseCommand(), cancellationToken).ConfigureAwait(false); } public Task Send(CommandPing command, CancellationToken cancellationToken) => Send(command.AsBaseCommand(), cancellationToken); public Task Send(CommandPong command, CancellationToken cancellationToken) => Send(command.AsBaseCommand(), cancellationToken); public Task Send(CommandAck command, CancellationToken cancellationToken) => Send(command.AsBaseCommand(), cancellationToken); public Task Send(CommandFlow command, CancellationToken cancellationToken) => Send(command.AsBaseCommand(), cancellationToken); public Task Send(CommandRedeliverUnacknowledgedMessages command, CancellationToken cancellationToken) => Send(command.AsBaseCommand(), cancellationToken); public async Task<BaseCommand> Send(CommandUnsubscribe command, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } public async Task<BaseCommand> Send(CommandConnect command, CancellationToken cancellationToken) { ThrowIfDisposed(); if (_authentication is not null) { command.AuthMethodName = _authentication.AuthenticationMethodName; command.AuthData = await _authentication.GetAuthenticationData(cancellationToken).ConfigureAwait(false); } Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } public async Task<BaseCommand> Send(CommandLookupTopic command, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } public async Task<BaseCommand> Send(CommandSeek command, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } public async Task<BaseCommand> Send(CommandGetLastMessageId command, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } public async Task<BaseCommand> Send(CommandCloseProducer command, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } public async Task<BaseCommand> Send(CommandCloseConsumer command, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } public async Task Send(SendPackage command, TaskCompletionSource<BaseCommand> responseTcs, CancellationToken cancellationToken) { ThrowIfDisposed(); using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { try { _channelManager.Outgoing(command.Command!, responseTcs); } catch (OperationCanceledException) { responseTcs.TrySetCanceled(); throw; } var sequence = Serializer.Serialize(command.Command!.AsBaseCommand(), command.Metadata!, command.Payload); await _stream.Send(sequence).ConfigureAwait(false); } } public async Task<BaseCommand> Send(CommandGetOrCreateSchema command, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } public async Task<BaseCommand> Send(CommandPartitionedTopicMetadata command, CancellationToken cancellationToken) { ThrowIfDisposed(); Task<BaseCommand>? responseTask; using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { responseTask = _channelManager.Outgoing(command); var sequence = Serializer.Serialize(command.AsBaseCommand()); await _stream.Send(sequence).ConfigureAwait(false); } return await responseTask.ConfigureAwait(false); } private async Task Send(BaseCommand command, CancellationToken cancellationToken) { ThrowIfDisposed(); var sequence = Serializer.Serialize(command); using (await _lock.Lock(cancellationToken).ConfigureAwait(false)) { await _stream.Send(sequence).ConfigureAwait(false); } } public async Task ProcessIncomingFrames(CancellationToken cancellationToken) { await Task.Yield(); try { await foreach (var frame in _stream.Frames(cancellationToken)) { var commandSize = frame.ReadUInt32(0, true); var command = Serializer.Deserialize<BaseCommand>(frame.Slice(4, commandSize)); if (_pingPongHandler.Incoming(command.CommandType)) continue; if (command.CommandType == BaseCommand.Type.Message) _channelManager.Incoming(command.Message, new ReadOnlySequence<byte>(frame.Slice(commandSize + 4).ToArray())); else if (command.CommandType == BaseCommand.Type.AuthChallenge) _ = Send(new CommandAuthResponse(), cancellationToken).ConfigureAwait(false); else _channelManager.Incoming(command); } } catch { // ignored } } public async ValueTask DisposeAsync() { if (Interlocked.Exchange(ref _isDisposed, 1) != 0) return; await _pingPongHandler.DisposeAsync().ConfigureAwait(false); await _lock.DisposeAsync().ConfigureAwait(false); _channelManager.Dispose(); await _stream.DisposeAsync().ConfigureAwait(false); } private void ThrowIfDisposed() { if (_isDisposed != 0) throw new ConnectionDisposedException(); } }