modules/platforms/dotnet/Apache.Ignite/Internal/ClientSocket.cs (559 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Internal
{
using System;
using System.Buffers.Binary;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Security;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Buffers;
using Ignite.Network;
using Log;
using Network;
using Proto;
using Proto.MsgPack;
/// <summary>
/// Wrapper over framework socket for Ignite thin client operations.
/// </summary>
// ReSharper disable SuggestBaseTypeForParameter (NetworkStream has more efficient read/write methods).
internal sealed class ClientSocket : IDisposable
{
/** General-purpose client type code. */
private const byte ClientType = 2;
/** Version 3.0.0. */
private static readonly ClientProtocolVersion Ver300 = new(3, 0, 0);
/** Current version. */
private static readonly ClientProtocolVersion CurrentProtocolVersion = Ver300;
/** Minimum supported heartbeat interval. */
private static readonly TimeSpan MinRecommendedHeartbeatInterval = TimeSpan.FromMilliseconds(500);
/** Socket id for debug logging. */
private static long _socketId;
/** Underlying stream. */
private readonly Stream _stream;
/** Current async operations, map from request id. */
private readonly ConcurrentDictionary<long, TaskCompletionSource<PooledBuffer>> _requests = new();
/** Requests can be sent by one thread at a time. */
[SuppressMessage(
"Microsoft.Design",
"CA2213:DisposableFieldsShouldBeDisposed",
Justification = "WaitHandle is not used in SemaphoreSlim, no need to dispose.")]
private readonly SemaphoreSlim _sendLock = new(initialCount: 1);
/** Cancellation token source that gets cancelled when this instance is disposed. */
[SuppressMessage(
"Microsoft.Design",
"CA2213:DisposableFieldsShouldBeDisposed",
Justification = "WaitHandle is not used in CancellationTokenSource, no need to dispose.")]
private readonly CancellationTokenSource _disposeTokenSource = new();
/** Dispose lock. */
private readonly object _disposeLock = new();
/** Heartbeat timer. */
private readonly Timer _heartbeatTimer;
/** Effective heartbeat interval. */
private readonly TimeSpan _heartbeatInterval;
/** Socket timeout for handshakes and heartbeats. */
private readonly TimeSpan _socketTimeout;
/** Logger. */
private readonly IIgniteLogger? _logger;
/** Partition assignment change callback. */
private readonly Action<ClientSocket> _assignmentChangeCallback;
/** Pre-allocated buffer for message size + op code + request id. To be used under <see cref="_sendLock"/>. */
private readonly byte[] _prefixBuffer = new byte[ProtoCommon.MessagePrefixSize];
/** Request id generator. */
private long _requestId;
/** Exception that caused this socket to close. */
private volatile Exception? _exception;
/// <summary>
/// Initializes a new instance of the <see cref="ClientSocket"/> class.
/// </summary>
/// <param name="stream">Network stream.</param>
/// <param name="configuration">Configuration.</param>
/// <param name="connectionContext">Connection context.</param>
/// <param name="assignmentChangeCallback">Partition assignment change callback.</param>
/// <param name="logger">Logger.</param>
private ClientSocket(
Stream stream,
IgniteClientConfiguration configuration,
ConnectionContext connectionContext,
Action<ClientSocket> assignmentChangeCallback,
IIgniteLogger? logger)
{
_stream = stream;
ConnectionContext = connectionContext;
_assignmentChangeCallback = assignmentChangeCallback;
_logger = logger;
_socketTimeout = configuration.SocketTimeout;
_heartbeatInterval = GetHeartbeatInterval(configuration.HeartbeatInterval, connectionContext.IdleTimeout, _logger);
// ReSharper disable once AsyncVoidLambda (timer callback)
_heartbeatTimer = new Timer(
callback: async _ => await SendHeartbeatAsync().ConfigureAwait(false),
state: null,
dueTime: _heartbeatInterval,
period: TimeSpan.FromMilliseconds(-1));
// Because this call is not awaited, execution of the current method continues before the call is completed.
// Receive loop runs in the background and should not be awaited.
_ = RunReceiveLoop(_disposeTokenSource.Token);
}
/// <summary>
/// Gets a value indicating whether this socket is disposed.
/// </summary>
public bool IsDisposed => _disposeTokenSource.IsCancellationRequested;
/// <summary>
/// Gets the connection context.
/// </summary>
public ConnectionContext ConnectionContext { get; }
/// <summary>
/// Connects the socket to the specified endpoint and performs handshake.
/// </summary>
/// <param name="endPoint">Specific endpoint to connect to.</param>
/// <param name="configuration">Configuration.</param>
/// <param name="assignmentChangeCallback">Partition assignment change callback.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
[SuppressMessage(
"Microsoft.Reliability",
"CA2000:Dispose objects before losing scope",
Justification = "NetworkStream is returned from this method in the socket.")]
public static async Task<ClientSocket> ConnectAsync(
SocketEndpoint endPoint,
IgniteClientConfiguration configuration,
Action<ClientSocket> assignmentChangeCallback)
{
var socket = new Socket(SocketType.Stream, ProtocolType.Tcp)
{
NoDelay = true
};
var logger = configuration.Logger.GetLogger(nameof(ClientSocket) + "-" + Interlocked.Increment(ref _socketId));
bool connected = false;
try
{
await socket.ConnectAsync(endPoint.EndPoint).ConfigureAwait(false);
connected = true;
if (logger?.IsEnabled(LogLevel.Debug) == true)
{
logger.Debug($"Connection established [remoteAddress={socket.RemoteEndPoint}]");
}
Metrics.ConnectionsEstablished.Add(1);
Metrics.ConnectionsActiveIncrement();
Stream stream = new NetworkStream(socket, ownsSocket: true);
if (configuration.SslStreamFactory is { } sslStreamFactory &&
await sslStreamFactory.CreateAsync(stream, endPoint.Host).ConfigureAwait(false) is { } sslStream)
{
stream = sslStream;
if (logger?.IsEnabled(LogLevel.Debug) == true)
{
logger.Debug(
$"SSL connection established [remoteAddress={socket.RemoteEndPoint}]: {sslStream.NegotiatedCipherSuite}");
}
}
var context = await HandshakeAsync(stream, endPoint.EndPoint, configuration)
.WaitAsync(configuration.SocketTimeout)
.ConfigureAwait(false);
if (logger?.IsEnabled(LogLevel.Debug) == true)
{
logger.Debug($"Handshake succeeded [remoteAddress={socket.RemoteEndPoint}]: {context}.");
}
return new ClientSocket(stream, configuration, context, assignmentChangeCallback, logger);
}
catch (Exception e)
{
logger?.Warn($"Connection failed before or during handshake [remoteAddress={endPoint.EndPoint}]: {e.Message}.", e);
if (e.GetBaseException() is TimeoutException)
{
Metrics.HandshakesFailedTimeout.Add(1);
}
else
{
Metrics.HandshakesFailed.Add(1);
}
// ReSharper disable once MethodHasAsyncOverload
socket.Dispose();
if (connected)
{
Metrics.ConnectionsActiveDecrement();
}
throw new IgniteClientConnectionException(
ErrorGroups.Client.Connection,
"Failed to connect to endpoint: " + endPoint.EndPoint,
e);
}
}
/// <summary>
/// Performs an in-out operation.
/// </summary>
/// <param name="clientOp">Client op code.</param>
/// <param name="request">Request data.</param>
/// <returns>Response data.</returns>
public Task<PooledBuffer> DoOutInOpAsync(ClientOp clientOp, PooledArrayBuffer? request = null)
{
var ex = _exception;
if (ex != null)
{
throw new IgniteClientConnectionException(
ErrorGroups.Client.Connection,
"Socket is closed due to an error, examine inner exception for details.",
ex);
}
if (_disposeTokenSource.IsCancellationRequested)
{
throw new IgniteClientConnectionException(
ErrorGroups.Client.Connection,
"Socket is disposed.",
new ObjectDisposedException(nameof(ClientSocket)));
}
var requestId = Interlocked.Increment(ref _requestId);
var taskCompletionSource = new TaskCompletionSource<PooledBuffer>();
_requests[requestId] = taskCompletionSource;
Metrics.RequestsActiveIncrement();
SendRequestAsync(request, clientOp, requestId)
.AsTask()
.ContinueWith(
(task, state) =>
{
var completionSource = (TaskCompletionSource<PooledBuffer>)state!;
if (task.IsCanceled || task.Exception?.GetBaseException() is OperationCanceledException or ObjectDisposedException)
{
// Canceled task means Dispose was called.
completionSource.TrySetException(
new IgniteClientConnectionException(ErrorGroups.Client.Connection, "Connection closed."));
}
else if (task.Exception != null)
{
completionSource.TrySetException(task.Exception);
}
if (_requests.TryRemove(requestId, out _))
{
Metrics.RequestsFailed.Add(1);
Metrics.RequestsActiveDecrement();
}
},
taskCompletionSource,
CancellationToken.None,
TaskContinuationOptions.NotOnRanToCompletion,
TaskScheduler.Default);
return taskCompletionSource.Task;
}
/// <inheritdoc/>
public void Dispose()
{
Dispose(null);
}
/// <summary>
/// Performs the handshake exchange.
/// </summary>
/// <param name="stream">Network stream.</param>
/// <param name="endPoint">Endpoint.</param>
/// <param name="configuration">Configuration.</param>
private static async Task<ConnectionContext> HandshakeAsync(
Stream stream,
IPEndPoint endPoint,
IgniteClientConfiguration configuration)
{
await stream.WriteAsync(ProtoCommon.MagicBytes).ConfigureAwait(false);
await WriteHandshakeAsync(stream, CurrentProtocolVersion, configuration).ConfigureAwait(false);
await stream.FlushAsync().ConfigureAwait(false);
await CheckMagicBytesAsync(stream).ConfigureAwait(false);
using var response = await ReadResponseAsync(stream, new byte[4], CancellationToken.None).ConfigureAwait(false);
return ReadHandshakeResponse(response.GetReader(), endPoint, GetSslInfo(stream));
}
private static async ValueTask CheckMagicBytesAsync(Stream stream)
{
var responseMagic = ByteArrayPool.Rent(ProtoCommon.MagicBytes.Length);
try
{
await ReceiveBytesAsync(stream, responseMagic, ProtoCommon.MagicBytes.Length, CancellationToken.None).ConfigureAwait(false);
for (var i = 0; i < ProtoCommon.MagicBytes.Length; i++)
{
if (responseMagic[i] != ProtoCommon.MagicBytes[i])
{
throw new IgniteClientConnectionException(
ErrorGroups.Client.Protocol,
"Invalid magic bytes returned from the server: " + BitConverter.ToString(responseMagic));
}
}
}
finally
{
ByteArrayPool.Return(responseMagic);
}
}
private static ConnectionContext ReadHandshakeResponse(MsgPackReader reader, IPEndPoint endPoint, ISslInfo? sslInfo)
{
var serverVer = new ClientProtocolVersion(reader.ReadInt16(), reader.ReadInt16(), reader.ReadInt16());
if (serverVer != CurrentProtocolVersion)
{
throw new IgniteClientConnectionException(ErrorGroups.Client.Protocol, "Unexpected server version: " + serverVer);
}
var exception = ReadError(ref reader);
if (exception != null)
{
throw exception;
}
var idleTimeoutMs = reader.ReadInt64();
var clusterNodeId = reader.ReadString();
var clusterNodeName = reader.ReadString();
var clusterId = reader.ReadGuid();
reader.Skip(); // Features.
reader.Skip(); // Extensions.
return new ConnectionContext(
serverVer,
TimeSpan.FromMilliseconds(idleTimeoutMs),
new ClusterNode(clusterNodeId, clusterNodeName, endPoint),
clusterId,
sslInfo);
}
private static IgniteException? ReadError(ref MsgPackReader reader)
{
if (reader.TryReadNil())
{
return null;
}
Guid traceId = reader.TryReadNil() ? Guid.NewGuid() : reader.ReadGuid();
int code = reader.TryReadNil() ? 65537 : reader.ReadInt32();
string className = reader.ReadString();
string? message = reader.ReadStringNullable();
string? javaStackTrace = reader.ReadStringNullable();
// TODO IGNITE-19838 Retry outdated schema error
reader.Skip(); // Error extensions.
return ExceptionMapper.GetException(traceId, code, className, message, javaStackTrace);
}
private static async ValueTask<PooledBuffer> ReadResponseAsync(
Stream stream,
byte[] messageSizeBytes,
CancellationToken cancellationToken)
{
var size = await ReadMessageSizeAsync(stream, messageSizeBytes, cancellationToken).ConfigureAwait(false);
var bytes = ByteArrayPool.Rent(size);
try
{
await ReceiveBytesAsync(stream, bytes, size, cancellationToken).ConfigureAwait(false);
return new PooledBuffer(bytes, 0, size);
}
catch (Exception)
{
ByteArrayPool.Return(bytes);
throw;
}
}
private static async Task<int> ReadMessageSizeAsync(
Stream stream,
byte[] buffer,
CancellationToken cancellationToken)
{
const int messageSizeByteCount = 4;
Debug.Assert(buffer.Length >= messageSizeByteCount, "buffer.Length >= messageSizeByteCount");
await ReceiveBytesAsync(stream, buffer, messageSizeByteCount, cancellationToken).ConfigureAwait(false);
return ReadMessageSize(buffer);
}
private static async Task ReceiveBytesAsync(
Stream stream,
byte[] buffer,
int size,
CancellationToken cancellationToken)
{
int received = 0;
while (received < size)
{
var res = await stream.ReadAsync(buffer.AsMemory(received, size - received), cancellationToken).ConfigureAwait(false);
if (res == 0)
{
// Disconnected.
throw new IgniteClientConnectionException(
ErrorGroups.Client.Protocol,
"Connection lost (failed to read data from socket)",
new SocketException((int) SocketError.ConnectionAborted));
}
received += res;
Metrics.BytesReceived.Add(res);
}
}
private static async ValueTask WriteHandshakeAsync(
Stream stream,
ClientProtocolVersion version,
IgniteClientConfiguration configuration)
{
using var bufferWriter = new PooledArrayBuffer(prefixSize: ProtoCommon.MessagePrefixSize);
WriteHandshake(bufferWriter.MessageWriter, version, configuration);
// Prepend size.
var buf = bufferWriter.GetWrittenMemory();
var size = buf.Length - ProtoCommon.MessagePrefixSize;
var resBuf = buf.Slice(ProtoCommon.MessagePrefixSize - 4);
WriteMessageSize(resBuf, size);
await stream.WriteAsync(resBuf).ConfigureAwait(false);
Metrics.BytesSent.Add(resBuf.Length);
}
private static void WriteHandshake(MsgPackWriter w, ClientProtocolVersion version, IgniteClientConfiguration configuration)
{
// Version.
w.Write(version.Major);
w.Write(version.Minor);
w.Write(version.Patch);
w.Write(ClientType); // Client type: general purpose.
w.WriteBinaryHeader(0); // Features.
if (configuration.Authenticator != null)
{
w.WriteMapHeader(3); // Extensions.
w.Write(HandshakeExtensions.AuthenticationType);
w.Write(configuration.Authenticator.Type);
w.Write(HandshakeExtensions.AuthenticationIdentity);
w.Write((string?)configuration.Authenticator.Identity);
w.Write(HandshakeExtensions.AuthenticationSecret);
w.Write((string?)configuration.Authenticator.Secret);
}
else
{
w.WriteMapHeader(0); // Extensions.
}
}
private static void WriteMessageSize(Memory<byte> target, int size) =>
BinaryPrimitives.WriteInt32BigEndian(target.Span, size);
private static int ReadMessageSize(Span<byte> responseLenBytes) => BinaryPrimitives.ReadInt32BigEndian(responseLenBytes);
private static TimeSpan GetHeartbeatInterval(TimeSpan configuredInterval, TimeSpan serverIdleTimeout, IIgniteLogger? logger)
{
if (configuredInterval <= TimeSpan.Zero)
{
throw new IgniteClientException(
ErrorGroups.Client.Configuration,
$"{nameof(IgniteClientConfiguration)}.{nameof(IgniteClientConfiguration.HeartbeatInterval)} should be greater than zero.");
}
if (serverIdleTimeout <= TimeSpan.Zero)
{
logger?.Info(
$"Server-side IdleTimeout is not set, using configured {nameof(IgniteClientConfiguration)}." +
$"{nameof(IgniteClientConfiguration.HeartbeatInterval)}: {configuredInterval}");
return configuredInterval;
}
var recommendedHeartbeatInterval = serverIdleTimeout / 3;
if (recommendedHeartbeatInterval < MinRecommendedHeartbeatInterval)
{
recommendedHeartbeatInterval = MinRecommendedHeartbeatInterval;
}
if (configuredInterval < recommendedHeartbeatInterval)
{
logger?.Info(
$"Server-side IdleTimeout is {serverIdleTimeout}, " +
$"using configured {nameof(IgniteClientConfiguration)}." +
$"{nameof(IgniteClientConfiguration.HeartbeatInterval)}: " +
configuredInterval);
return configuredInterval;
}
logger?.Warn(
$"Server-side IdleTimeout is {serverIdleTimeout}, configured " +
$"{nameof(IgniteClientConfiguration)}.{nameof(IgniteClientConfiguration.HeartbeatInterval)} " +
$"is {configuredInterval}, which is longer than recommended IdleTimeout / 3. " +
$"Overriding heartbeat interval with max(IdleTimeout / 3, 500ms): {recommendedHeartbeatInterval}");
return recommendedHeartbeatInterval;
}
private static ISslInfo? GetSslInfo(Stream stream) =>
stream is SslStream sslStream
? new SslInfo(
sslStream.TargetHostName,
sslStream.NegotiatedCipherSuite.ToString(),
sslStream.IsMutuallyAuthenticated,
sslStream.LocalCertificate,
sslStream.RemoteCertificate,
sslStream.SslProtocol)
: null;
private async ValueTask SendRequestAsync(PooledArrayBuffer? request, ClientOp op, long requestId)
{
// Reset heartbeat timer - don't sent heartbeats when connection is active anyway.
_heartbeatTimer.Change(dueTime: _heartbeatInterval, period: TimeSpan.FromMilliseconds(-1));
if (_logger?.IsEnabled(LogLevel.Trace) == true)
{
_logger.Trace($"Sending request [op={op}, remoteAddress={ConnectionContext.ClusterNode.Address}, requestId={requestId}]");
}
await _sendLock.WaitAsync(_disposeTokenSource.Token).ConfigureAwait(false);
try
{
var prefixMem = _prefixBuffer.AsMemory()[4..];
var prefixSize = MsgPackWriter.WriteUnsigned(prefixMem.Span, (ulong)op);
prefixSize += MsgPackWriter.WriteUnsigned(prefixMem[prefixSize..].Span, (ulong)requestId);
if (request != null)
{
var requestBuf = request.GetWrittenMemory();
WriteMessageSize(_prefixBuffer, prefixSize + requestBuf.Length - ProtoCommon.MessagePrefixSize);
var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 4)];
var requestBufStart = ProtoCommon.MessagePrefixSize - prefixBytes.Length;
var requestBufWithPrefix = requestBuf.Slice(requestBufStart);
// Copy prefix to request buf to avoid extra WriteAsync call for the prefix.
prefixBytes.CopyTo(requestBufWithPrefix);
await _stream.WriteAsync(requestBufWithPrefix, _disposeTokenSource.Token).ConfigureAwait(false);
Metrics.BytesSent.Add(requestBufWithPrefix.Length);
}
else
{
// Request without body, send only the prefix.
WriteMessageSize(_prefixBuffer, prefixSize);
var prefixBytes = _prefixBuffer.AsMemory()[..(prefixSize + 4)];
await _stream.WriteAsync(prefixBytes, _disposeTokenSource.Token).ConfigureAwait(false);
Metrics.BytesSent.Add(prefixBytes.Length);
}
Metrics.RequestsSent.Add(1);
}
finally
{
_sendLock.Release();
}
}
[SuppressMessage(
"Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "Any exception in receive loop should be handled.")]
private async Task RunReceiveLoop(CancellationToken cancellationToken)
{
// Reuse the same array for all responses.
var messageSizeBytes = new byte[4];
try
{
while (!cancellationToken.IsCancellationRequested)
{
PooledBuffer response = await ReadResponseAsync(_stream, messageSizeBytes, cancellationToken).ConfigureAwait(false);
// Invoke response handler in another thread to continue the receive loop.
// Response buffer should be disposed by the task handler.
ThreadPool.QueueUserWorkItem(r => HandleResponse((PooledBuffer)r!), response);
}
}
catch (Exception e)
{
var message = "Exception while reading from socket, connection closed: " + e.Message;
_logger?.Error(e, message);
Dispose(new IgniteClientConnectionException(ErrorGroups.Client.Connection, message, e));
}
}
private void HandleResponse(PooledBuffer response)
{
var reader = response.GetReader();
var responseType = (ServerMessageType)reader.ReadInt32();
if (responseType != ServerMessageType.Response)
{
// Notifications are not used for now.
return;
}
var requestId = reader.ReadInt64();
if (!_requests.TryRemove(requestId, out var taskCompletionSource))
{
var message = $"Unexpected response ID ({requestId}) received from the server " +
$"[remoteAddress={ConnectionContext.ClusterNode.Address}], closing the socket.";
_logger?.Error(message);
Dispose(new IgniteClientConnectionException(ErrorGroups.Client.Protocol, message));
return;
}
Metrics.RequestsActiveDecrement();
var flags = (ResponseFlags)reader.ReadInt32();
if (flags.HasFlag(ResponseFlags.PartitionAssignmentChanged))
{
if (_logger?.IsEnabled(LogLevel.Info) == true)
{
_logger.Info(
$"Partition assignment change notification received [remoteAddress={ConnectionContext.ClusterNode.Address}]");
}
_assignmentChangeCallback(this);
}
// TODO IGNITE-20056 .NET: Thin 3.0: Track observable timestamp
_ = reader.ReadInt64();
var exception = ReadError(ref reader);
if (exception != null)
{
response.Dispose();
Metrics.RequestsFailed.Add(1);
taskCompletionSource.SetException(exception);
}
else
{
var resultBuffer = response.Slice(reader.Consumed);
Metrics.RequestsCompleted.Add(1);
taskCompletionSource.SetResult(resultBuffer);
}
}
/// <summary>
/// Sends heartbeat message.
/// </summary>
[SuppressMessage(
"Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "Any heartbeat exception should cause this instance to be disposed with an error.")]
private async Task SendHeartbeatAsync()
{
try
{
await DoOutInOpAsync(ClientOp.Heartbeat).WaitAsync(_socketTimeout).ConfigureAwait(false);
}
catch (Exception e)
{
_logger?.Error(e, "Heartbeat failed: " + e.Message);
Dispose(e);
}
}
/// <summary>
/// Disposes this socket and completes active requests with the specified exception.
/// </summary>
/// <param name="ex">Exception that caused this socket to close. Null when socket is closed by the user.</param>
private void Dispose(Exception? ex)
{
lock (_disposeLock)
{
if (_disposeTokenSource.IsCancellationRequested)
{
return;
}
_disposeTokenSource.Cancel();
if (ex != null)
{
_logger?.Warn(ex, $"Connection closed [remoteAddress={ConnectionContext.ClusterNode.Address}]: " + ex.Message);
if (ex.GetBaseException() is TimeoutException)
{
Metrics.ConnectionsLostTimeout.Add(1);
}
else
{
Metrics.ConnectionsLost.Add(1);
}
}
else if (_logger?.IsEnabled(LogLevel.Debug) == true)
{
_logger.Debug($"Connection closed [remoteAddress={ConnectionContext.ClusterNode.Address}]");
}
_heartbeatTimer.Dispose();
_exception = ex;
_stream.Dispose();
ex ??= new IgniteClientConnectionException(ErrorGroups.Client.Connection, "Connection closed.");
while (!_requests.IsEmpty)
{
foreach (var reqId in _requests.Keys.ToArray())
{
if (_requests.TryRemove(reqId, out var req))
{
req.TrySetException(ex);
Metrics.RequestsActiveDecrement();
}
}
}
Metrics.ConnectionsActiveDecrement();
}
}
}
}