modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs (470 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Apache.Ignite.Internal { using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Linq; using System.Net; using System.Net.Sockets; using System.Threading; using System.Threading.Tasks; using Buffers; using Common; using Ignite.Network; using Microsoft.Extensions.Logging; using Network; using Proto; using Transactions; /// <summary> /// Client socket wrapper with reconnect/failover functionality. /// </summary> internal sealed class ClientFailoverSocket : IDisposable, IClientSocketEventListener { private const string ExceptionDataEndpoint = "Endpoint"; /** Current global endpoint index for Round-robin. */ private static long _globalEndPointIndex; /** Logger. */ private readonly ILogger _logger; /** Endpoints with corresponding hosts - from configuration. */ private readonly IReadOnlyList<SocketEndpoint> _endpoints; /** Cluster node unique name to endpoint map. */ private readonly ConcurrentDictionary<string, SocketEndpoint> _endpointsByName = new(); /** Socket connection lock. */ [SuppressMessage( "Microsoft.Design", "CA2213:DisposableFieldsShouldBeDisposed", Justification = "WaitHandle is not used in SemaphoreSlim, no need to dispose.")] private readonly SemaphoreSlim _socketLock = new(1); /** Disposed flag. */ private volatile bool _disposed; /** Local topology assignment version. Instead of using event handlers to notify all tables about assignment change, * the table will compare its version with channel version to detect an update. */ private long _assignmentTimestamp; /** Cluster id from the first handshake. */ private Guid? _clusterId; /** Local index for round-robin balancing within this FailoverSocket. */ private long _endPointIndex = Interlocked.Increment(ref _globalEndPointIndex); /** Observable timestamp. */ private long _observableTimestamp; /// <summary> /// Initializes a new instance of the <see cref="ClientFailoverSocket"/> class. /// </summary> /// <param name="configuration">Client configuration.</param> /// <param name="logger">Logger.</param> private ClientFailoverSocket(IgniteClientConfigurationInternal configuration, ILogger logger) { if (configuration.Configuration.Endpoints.Count == 0) { throw new IgniteClientException( ErrorGroups.Client.Configuration, $"Invalid {nameof(IgniteClientConfiguration)}: {nameof(IgniteClientConfiguration.Endpoints)} is empty. Nowhere to connect."); } _logger = logger; _endpoints = GetIpEndPoints(configuration.Configuration).ToList(); Configuration = configuration; } /// <summary> /// Gets the configuration. /// </summary> public IgniteClientConfigurationInternal Configuration { get; } /// <summary> /// Gets the partition assignment timestamp. /// </summary> public long PartitionAssignmentTimestamp => Interlocked.Read(ref _assignmentTimestamp); /// <summary> /// Gets the observable timestamp. /// </summary> public long ObservableTimestamp => Interlocked.Read(ref _observableTimestamp); /// <summary> /// Gets the client ID. /// </summary> public Guid ClientId { get; } = Guid.NewGuid(); /// <summary> /// Gets a value indicating whether the socket is disposed. /// </summary> public bool IsDisposed => _disposed; /// <summary> /// Connects the socket. /// </summary> /// <param name="configuration">Client configuration.</param> /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns> public static async Task<ClientFailoverSocket> ConnectAsync(IgniteClientConfigurationInternal configuration) { var logger = configuration.Configuration.LoggerFactory.CreateLogger<ClientFailoverSocket>(); logger.LogClientStartInfo(VersionUtils.InformationalVersion); var socket = new ClientFailoverSocket(configuration, logger); await socket.GetNextSocketAsync().ConfigureAwait(false); // Because this call is not awaited, execution of the current method continues before the call is completed. // Secondary connections are established in the background. _ = socket.ConnectAllSockets(); return socket; } /// <summary> /// Resets global endpoint index. For testing purposes only (to make behavior deterministic). /// </summary> public static void ResetGlobalEndpointIndex() => _globalEndPointIndex = 0; /// <summary> /// Performs an in-out operation. /// </summary> /// <param name="clientOp">Client op code.</param> /// <param name="request">Request data.</param> /// <param name="preferredNode">Preferred node.</param> /// <param name="expectNotifications">Whether to expect notifications as a result of the operation.</param> /// <returns>Response data and socket.</returns> public async Task<PooledBuffer> DoOutInOpAsync( ClientOp clientOp, PooledArrayBuffer? request = null, PreferredNode preferredNode = default, bool expectNotifications = false) { var (buffer, _) = await DoOutInOpAndGetSocketAsync( clientOp, tx: null, request, preferredNode, retryPolicyOverride: null, expectNotifications) .ConfigureAwait(false); return buffer; } /// <summary> /// Performs an in-out operation. /// </summary> /// <param name="clientOp">Client op code.</param> /// <param name="tx">Transaction.</param> /// <param name="request">Request data.</param> /// <param name="preferredNode">Preferred node.</param> /// <param name="retryPolicyOverride">Retry policy.</param> /// <param name="expectNotifications">Whether to expect notifications as a result of the operation.</param> /// <returns>Response data and socket.</returns> public async Task<(PooledBuffer Buffer, ClientSocket Socket)> DoOutInOpAndGetSocketAsync( ClientOp clientOp, Transaction? tx = null, PooledArrayBuffer? request = null, PreferredNode preferredNode = default, IRetryPolicy? retryPolicyOverride = null, bool expectNotifications = false) { if (tx != null) { if (tx.FailoverSocket != this) { throw new IgniteClientException(ErrorGroups.Client.Connection, "Specified transaction belongs to a different IgniteClient instance."); } // Use tx-specific socket without retry and failover. var buffer = await tx.Socket.DoOutInOpAsync(clientOp, request, expectNotifications).ConfigureAwait(false); return (buffer, tx.Socket); } return await DoWithRetryAsync( (clientOp, request, expectNotifications), static (_, arg) => arg.clientOp, async static (socket, arg) => { var res = await socket.DoOutInOpAsync(arg.clientOp, arg.request, arg.expectNotifications).ConfigureAwait(false); return (Buffer: res, Socket: socket); }, preferredNode, retryPolicyOverride) .ConfigureAwait(false); } /// <summary> /// Performs a socket operation with retry and reconnect. /// </summary> /// <param name="arg">Func argument.</param> /// <param name="opFunc">Client op func.</param> /// <param name="func">Result func.</param> /// <param name="preferredNode">Preferred node.</param> /// <param name="retryPolicyOverride">Retry policy.</param> /// <typeparam name="T">Result type.</typeparam> /// <typeparam name="TArg">Arg type.</typeparam> /// <returns>Result.</returns> public async Task<T> DoWithRetryAsync<T, TArg>( TArg arg, Func<ClientSocket?, TArg, ClientOp> opFunc, Func<ClientSocket, TArg, Task<T>> func, PreferredNode preferredNode = default, IRetryPolicy? retryPolicyOverride = null) { var attempt = 0; List<Exception>? errors = null; while (true) { ClientSocket? socket = null; try { socket = await GetSocketAsync(preferredNode).ConfigureAwait(false); return await func(socket, arg).ConfigureAwait(false); } catch (Exception e) { // Preferred node connection may not be available, do not use it after first failure. preferredNode = default; MetricsContext? metricsContext = socket?.MetricsContext ?? (e.Data[ExceptionDataEndpoint] as SocketEndpoint)?.MetricsContext ?? (e.InnerException?.Data[ExceptionDataEndpoint] as SocketEndpoint)?.MetricsContext; IRetryPolicy retryPolicy = retryPolicyOverride ?? Configuration.Configuration.RetryPolicy; if (!HandleOpError(e, opFunc(socket, arg), ref attempt, ref errors, retryPolicy, metricsContext)) { throw; } } } } /// <inheritdoc/> public void Dispose() { _socketLock.Wait(); try { if (_disposed) { return; } _disposed = true; foreach (var endpoint in _endpoints) { endpoint.Socket?.Dispose(); } } finally { _socketLock.Release(); } } /// <summary> /// Gets active connections. /// </summary> /// <returns>Active connections.</returns> public IList<IConnectionInfo> GetConnections() { var res = new List<IConnectionInfo>(_endpoints.Count); foreach (var endpoint in _endpoints) { if (endpoint.Socket is { IsDisposed: false, ConnectionContext: { } ctx }) { res.Add(new ConnectionInfo(ctx.ClusterNode, ctx.SslInfo)); } } return res; } /// <inheritdoc/> void IClientSocketEventListener.OnAssignmentChanged(long timestamp) { while (true) { var oldTimestamp = Interlocked.Read(ref _assignmentTimestamp); if (oldTimestamp >= timestamp) { return; } if (Interlocked.CompareExchange(ref _assignmentTimestamp, value: timestamp, comparand: oldTimestamp) == oldTimestamp) { return; } } } /// <inheritdoc/> void IClientSocketEventListener.OnObservableTimestampChanged(long timestamp) { // Atomically update the observable timestamp to max(newTs, curTs). while (true) { var current = Interlocked.Read(ref _observableTimestamp); if (current >= timestamp) { return; } if (Interlocked.CompareExchange(ref _observableTimestamp, timestamp, current) == current) { return; } } } /// <summary> /// Gets active sockets. /// </summary> /// <returns>Active sockets.</returns> internal IEnumerable<ClientSocket> GetSockets() { var res = new List<ClientSocket>(_endpoints.Count); foreach (var endpoint in _endpoints) { if (endpoint.Socket is { IsDisposed: false }) { res.Add(endpoint.Socket); } } return res; } /// <summary> /// Gets a socket. Reconnects if necessary. /// </summary> /// <param name="preferredNode">Preferred node.</param> /// <returns>Client socket.</returns> [SuppressMessage( "Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Any connection exception should be handled.")] private async ValueTask<ClientSocket> GetSocketAsync(PreferredNode preferredNode = default) { ThrowIfDisposed(); // 1. Preferred node connection. if (preferredNode.Name != null && _endpointsByName.TryGetValue(preferredNode.Name, out var endpoint)) { try { return await ConnectAsync(endpoint).ConfigureAwait(false); } catch (Exception e) { _logger.LogFailedToConnectPreferredNodeDebug(preferredNode.Name, e.Message); } } // 2. Round-robin connection. if (GetNextSocketWithoutReconnect() is { } nextSocket) { return nextSocket; } // 3. Default connection. return await GetNextSocketAsync().ConfigureAwait(false); } [SuppressMessage( "Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "Secondary connection errors can be ignored.")] private async Task ConnectAllSockets() { var tasks = new List<Task>(_endpoints.Count); while (!_disposed) { tasks.Clear(); foreach (var endpoint in _endpoints) { try { var connectTask = ConnectAsync(endpoint); if (connectTask.IsCompleted) { continue; } tasks.Add(connectTask.AsTask()); } catch (Exception e) { _logger.LogErrorWhileEstablishingSecondaryConnectionsWarn(e, e.Message); } } if (_logger.IsEnabled(LogLevel.Debug)) { _logger.LogTryingToEstablishSecondaryConnectionsDebug(tasks.Count); } // Await every task separately instead of using WhenAll to capture exceptions and avoid extra allocations. int failed = 0; foreach (var task in tasks) { try { await task.ConfigureAwait(false); } catch (Exception e) { _logger.LogErrorWhileEstablishingSecondaryConnectionsWarn(e, e.Message); failed++; } } if (_logger.IsEnabled(LogLevel.Debug)) { _logger.LogSecondaryConnectionsEstablishedDebug(tasks.Count - failed, failed); } if (Configuration.Configuration.ReconnectInterval <= TimeSpan.Zero) { // Interval is zero - periodic reconnect is disabled. return; } await Task.Delay(Configuration.Configuration.ReconnectInterval).ConfigureAwait(false); } } /// <summary> /// Throws if disposed. /// </summary> private void ThrowIfDisposed() => ObjectDisposedException.ThrowIf(_disposed, this); /// <summary> /// Gets the next connected socket, or connects a new one. /// </summary> [SuppressMessage("Maintainability", "CA1508:Avoid dead conditional code", Justification = "False positive")] private async ValueTask<ClientSocket> GetNextSocketAsync() { List<Exception>? errors = null; var startIdx = unchecked((int) Interlocked.Increment(ref _endPointIndex)); for (var i = 0; i < _endpoints.Count; i++) { var idx = Math.Abs(startIdx + i) % _endpoints.Count; var endPoint = _endpoints[idx]; if (endPoint.Socket is { IsDisposed: false }) { return endPoint.Socket; } try { return await ConnectAsync(endPoint).ConfigureAwait(false); } catch (IgniteClientConnectionException e) when (e.GetBaseException() is SocketException or IOException) { errors ??= new List<Exception>(); e.Data[ExceptionDataEndpoint] = endPoint; errors.Add(e); } } throw new AggregateException( "Failed to establish Ignite thin client connection, examine inner exceptions for details.", errors!); } /// <summary> /// Gets the next connected socket, without establishing new connections. /// </summary> private ClientSocket? GetNextSocketWithoutReconnect() { var startIdx = unchecked((int) Interlocked.Increment(ref _endPointIndex)); for (var i = 0; i < _endpoints.Count; i++) { var idx = Math.Abs(startIdx + i) % _endpoints.Count; var endPoint = _endpoints[idx]; if (endPoint.Socket is { IsDisposed: false }) { return endPoint.Socket; } } return null; } /// <summary> /// Connects to the given endpoint. /// </summary> private async ValueTask<ClientSocket> ConnectAsync(SocketEndpoint endpoint) { if (endpoint.Socket?.IsDisposed == false) { return endpoint.Socket; } await _socketLock.WaitAsync().ConfigureAwait(false); try { if (endpoint.Socket?.IsDisposed == false) { return endpoint.Socket; } var socket = await ClientSocket.ConnectAsync(endpoint, Configuration, this).ConfigureAwait(false); if (_clusterId == null) { _clusterId = socket.ConnectionContext.ClusterId; } else if (!socket.ConnectionContext.ClusterIds.Contains(_clusterId.Value)) { socket.Dispose(); throw new IgniteClientConnectionException( ErrorGroups.Client.ClusterIdMismatch, $"Cluster ID mismatch: expected={_clusterId}, actual={socket.ConnectionContext.ClusterIds.StringJoin()}"); } endpoint.Socket = socket; _endpointsByName[socket.ConnectionContext.ClusterNode.Name] = endpoint; return socket; } finally { _socketLock.Release(); } } /// <summary> /// Gets the endpoints: all combinations of IP addresses and ports according to configuration. /// </summary> private IEnumerable<SocketEndpoint> GetIpEndPoints(IgniteClientConfiguration cfg) { // Metric collection tools expect numbers and strings, don't pass Guid. var clientId = ClientId.ToString(); foreach (var e in Endpoint.GetEndpoints(cfg)) { var host = e.Host; Debug.Assert(host != null, "host != null"); // Checked by GetEndpoints. foreach (var ip in GetIps(host)) { yield return new SocketEndpoint(new IPEndPoint(ip, e.Port), host, clientId); } } } /// <summary> /// Gets IP address list from a given host. /// When host is an IP already - parses it. Otherwise, resolves DNS name to IPs. /// </summary> private IEnumerable<IPAddress> GetIps(string host, bool suppressExceptions = false) { try { // GetHostEntry accepts IPs, but TryParse is a more efficient shortcut. return IPAddress.TryParse(host, out var ip) ? new[] { ip } : Dns.GetHostEntry(host).AddressList; } catch (SocketException e) { _logger.LogFailedToParseHostDebug(e, host, e.Message); if (suppressExceptions) { return Enumerable.Empty<IPAddress>(); } throw; } } /// <summary> /// Gets a value indicating whether a failed operation should be retried. /// </summary> /// <param name="exception">Exception that caused the operation to fail.</param> /// <param name="op">Operation code.</param> /// <param name="attempt">Current attempt.</param> /// <param name="retryPolicy">Retry policy.</param> /// <returns> /// <c>true</c> if the operation should be retried on another connection, <c>false</c> otherwise. /// </returns> private bool ShouldRetry(Exception exception, ClientOp op, int attempt, IRetryPolicy? retryPolicy) { var e = exception; while (e != null && !IsConnectionError(e)) { e = e.InnerException; } if (e == null) { // Only retry connection errors. return false; } if (retryPolicy is null or RetryNonePolicy) { return false; } var publicOpType = op.ToPublicOperationType(); if (publicOpType == null) { // System operation. return true; } var ctx = new RetryPolicyContext(new(Configuration.Configuration), publicOpType.Value, attempt, exception); return retryPolicy.ShouldRetry(ctx); static bool IsConnectionError(Exception e) => e is SocketException or IOException or IgniteClientConnectionException { Code: ErrorGroups.Client.Connection }; } /// <summary> /// Handles operation error. /// </summary> /// <param name="exception">Error.</param> /// <param name="op">Operation code.</param> /// <param name="attempt">Current attempt.</param> /// <param name="errors">Previous errors.</param> /// <param name="retryPolicy">Retry policy.</param> /// <param name="metricsContext">Metrics context.</param> /// <returns>True if the error was handled, false otherwise.</returns> [SuppressMessage("Microsoft.Design", "CA1002:DoNotExposeGenericLists", Justification = "Private.")] private bool HandleOpError( Exception exception, ClientOp op, ref int attempt, ref List<Exception>? errors, IRetryPolicy? retryPolicy, MetricsContext? metricsContext) { if (!ShouldRetry(exception, op, attempt, retryPolicy)) { if (_logger.IsEnabled(LogLevel.Debug)) { _logger.LogRetryingOperationDebug("Not retrying", (int)op, op, attempt, exception.Message); } if (errors == null) { return false; } errors.Add(exception); var inner = new AggregateException(errors); throw new IgniteClientConnectionException( ErrorGroups.Client.Connection, $"Operation {op} failed after {attempt} retries, examine InnerException for details.", inner); } if (_logger.IsEnabled(LogLevel.Debug)) { _logger.LogRetryingOperationDebug("Retrying", (int)op, op, attempt, exception.Message); } Metrics.RequestsRetried.Add(1, metricsContext?.Tags ?? Array.Empty<KeyValuePair<string, object?>>()); Debug.Assert(metricsContext != null, "metricsContext != null"); if (errors == null) { errors = new List<Exception> { exception }; } else { errors.Add(exception); } attempt++; return true; } } }