modules/platforms/dotnet/Apache.Ignite/Internal/ClientFailoverSocket.cs (377 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Internal
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Buffers;
using Ignite.Network;
using Log;
using Network;
using Proto;
using Transactions;
/// <summary>
/// Client socket wrapper with reconnect/failover functionality.
/// </summary>
internal sealed class ClientFailoverSocket : IDisposable
{
/** Current global endpoint index for Round-robin. */
private static long _globalEndPointIndex;
/** Logger. */
private readonly IIgniteLogger? _logger;
/** Endpoints with corresponding hosts - from configuration. */
private readonly IReadOnlyList<SocketEndpoint> _endpoints;
/** Cluster node unique name to endpoint map. */
private readonly ConcurrentDictionary<string, SocketEndpoint> _endpointsByName = new();
/** Socket connection lock. */
[SuppressMessage(
"Microsoft.Design",
"CA2213:DisposableFieldsShouldBeDisposed",
Justification = "WaitHandle is not used in SemaphoreSlim, no need to dispose.")]
private readonly SemaphoreSlim _socketLock = new(1);
/** Last connected socket. Used to track partition assignment updates. */
private volatile ClientSocket? _lastConnectedSocket;
/** Disposed flag. */
private volatile bool _disposed;
/** Local topology assignment version. Instead of using event handlers to notify all tables about assignment change,
* the table will compare its version with channel version to detect an update. */
private int _assignmentVersion;
/** Cluster id from the first handshake. */
private Guid? _clusterId;
/** Local index for round-robin balancing within this FailoverSocket. */
private long _endPointIndex = Interlocked.Increment(ref _globalEndPointIndex);
/// <summary>
/// Initializes a new instance of the <see cref="ClientFailoverSocket"/> class.
/// </summary>
/// <param name="configuration">Client configuration.</param>
private ClientFailoverSocket(IgniteClientConfiguration configuration)
{
if (configuration.Endpoints.Count == 0)
{
throw new IgniteClientException(
ErrorGroups.Client.Configuration,
$"Invalid {nameof(IgniteClientConfiguration)}: {nameof(IgniteClientConfiguration.Endpoints)} is empty. Nowhere to connect.");
}
_logger = configuration.Logger.GetLogger(GetType());
_endpoints = GetIpEndPoints(configuration).ToList();
Configuration = new(configuration); // Defensive copy.
}
/// <summary>
/// Gets the configuration.
/// </summary>
public IgniteClientConfiguration Configuration { get; }
/// <summary>
/// Gets the partition assignment version.
/// </summary>
public int PartitionAssignmentVersion => Interlocked.CompareExchange(ref _assignmentVersion, -1, -1);
/// <summary>
/// Connects the socket.
/// </summary>
/// <param name="configuration">Client configuration.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public static async Task<ClientFailoverSocket> ConnectAsync(IgniteClientConfiguration configuration)
{
var socket = new ClientFailoverSocket(configuration);
var logger = configuration.Logger.GetLogger(typeof(ClientFailoverSocket));
logger?.Info("Ignite.NET client version " + VersionUtils.GetInformationalVersion() + " is starting");
await socket.GetNextSocketAsync().ConfigureAwait(false);
// Because this call is not awaited, execution of the current method continues before the call is completed.
// Secondary connections are established in the background.
_ = socket.ConnectAllSockets();
return socket;
}
/// <summary>
/// Resets global endpoint index. For testing purposes only (to make behavior deterministic).
/// </summary>
public static void ResetGlobalEndpointIndex() => _globalEndPointIndex = 0;
/// <summary>
/// Performs an in-out operation.
/// </summary>
/// <param name="clientOp">Client op code.</param>
/// <param name="request">Request data.</param>
/// <param name="preferredNode">Preferred node.</param>
/// <returns>Response data and socket.</returns>
public async Task<PooledBuffer> DoOutInOpAsync(
ClientOp clientOp,
PooledArrayBuffer? request = null,
PreferredNode preferredNode = default)
{
var (buffer, _) = await DoOutInOpAndGetSocketAsync(clientOp, tx: null, request, preferredNode).ConfigureAwait(false);
return buffer;
}
/// <summary>
/// Performs an in-out operation.
/// </summary>
/// <param name="clientOp">Client op code.</param>
/// <param name="tx">Transaction.</param>
/// <param name="request">Request data.</param>
/// <param name="preferredNode">Preferred node.</param>
/// <param name="retryPolicyOverride">Retry policy.</param>
/// <returns>Response data and socket.</returns>
public async Task<(PooledBuffer Buffer, ClientSocket Socket)> DoOutInOpAndGetSocketAsync(
ClientOp clientOp,
Transaction? tx = null,
PooledArrayBuffer? request = null,
PreferredNode preferredNode = default,
IRetryPolicy? retryPolicyOverride = null)
{
if (tx != null)
{
if (tx.FailoverSocket != this)
{
throw new IgniteClientException(ErrorGroups.Client.Connection, "Specified transaction belongs to a different IgniteClient instance.");
}
// Use tx-specific socket without retry and failover.
var buffer = await tx.Socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
return (buffer, tx.Socket);
}
var attempt = 0;
List<Exception>? errors = null;
while (true)
{
try
{
var socket = await GetSocketAsync(preferredNode).ConfigureAwait(false);
var buffer = await socket.DoOutInOpAsync(clientOp, request).ConfigureAwait(false);
return (buffer, socket);
}
catch (Exception e)
{
// Preferred node connection may not be available, do not use it after first failure.
preferredNode = default;
if (!HandleOpError(e, clientOp, ref attempt, ref errors, retryPolicyOverride ?? Configuration.RetryPolicy))
{
throw;
}
}
}
}
/// <inheritdoc/>
public void Dispose()
{
_socketLock.Wait();
try
{
if (_disposed)
{
return;
}
_disposed = true;
foreach (var endpoint in _endpoints)
{
endpoint.Socket?.Dispose();
}
}
finally
{
_socketLock.Release();
}
}
/// <summary>
/// Gets active connections.
/// </summary>
/// <returns>Active connections.</returns>
public IList<IConnectionInfo> GetConnections()
{
var res = new List<IConnectionInfo>(_endpoints.Count);
foreach (var endpoint in _endpoints)
{
if (endpoint.Socket is { IsDisposed: false, ConnectionContext: { } ctx })
{
res.Add(new ConnectionInfo(ctx.ClusterNode, ctx.SslInfo));
}
}
return res;
}
/// <summary>
/// Gets a socket. Reconnects if necessary.
/// </summary>
/// <param name="preferredNode">Preferred node.</param>
/// <returns>Client socket.</returns>
[SuppressMessage(
"Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "Any connection exception should be handled.")]
private async ValueTask<ClientSocket> GetSocketAsync(PreferredNode preferredNode = default)
{
ThrowIfDisposed();
// 1. Preferred node connection.
if (preferredNode != default && _endpointsByName.TryGetValue(preferredNode.Name, out var endpoint))
{
try
{
return await ConnectAsync(endpoint).ConfigureAwait(false);
}
catch (Exception e)
{
_logger?.Warn(e, $"Failed to connect to preferred node [{preferredNode}]: {e.Message}");
}
}
// 2. Round-robin connection.
if (GetNextSocketWithoutReconnect() is { } nextSocket)
{
return nextSocket;
}
// 3. Default connection.
return await GetNextSocketAsync().ConfigureAwait(false);
}
[SuppressMessage(
"Microsoft.Design",
"CA1031:DoNotCatchGeneralExceptionTypes",
Justification = "Secondary connection errors can be ignored.")]
private async Task ConnectAllSockets()
{
var tasks = new List<Task>(_endpoints.Count);
while (!_disposed)
{
try
{
tasks.Clear();
foreach (var endpoint in _endpoints)
{
if (endpoint.Socket?.IsDisposed == false)
{
continue;
}
tasks.Add(ConnectAsync(endpoint).AsTask());
}
_logger?.Debug("Trying to establish secondary connections - awaiting {0} tasks...", tasks.Count);
await Task.WhenAll(tasks).ConfigureAwait(false);
_logger?.Debug("All secondary connections established.");
}
catch (Exception e)
{
_logger?.Warn(e, "Error while trying to establish secondary connections: " + e.Message);
}
if (Configuration.ReconnectInterval <= TimeSpan.Zero)
{
// Interval is zero - periodic reconnect is disabled.
return;
}
await Task.Delay(Configuration.ReconnectInterval).ConfigureAwait(false);
}
}
/// <summary>
/// Throws if disposed.
/// </summary>
private void ThrowIfDisposed()
{
if (_disposed)
{
throw new ObjectDisposedException(nameof(ClientFailoverSocket));
}
}
/// <summary>
/// Gets the next connected socket, or connects a new one.
/// </summary>
[SuppressMessage("Maintainability", "CA1508:Avoid dead conditional code", Justification = "False positive")]
private async ValueTask<ClientSocket> GetNextSocketAsync()
{
List<Exception>? errors = null;
var startIdx = unchecked((int) Interlocked.Increment(ref _endPointIndex));
for (var i = 0; i < _endpoints.Count; i++)
{
var idx = Math.Abs(startIdx + i) % _endpoints.Count;
var endPoint = _endpoints[idx];
if (endPoint.Socket is { IsDisposed: false })
{
return endPoint.Socket;
}
try
{
return await ConnectAsync(endPoint).ConfigureAwait(false);
}
catch (IgniteClientConnectionException e) when (e.GetBaseException() is SocketException or IOException)
{
errors ??= new List<Exception>();
errors.Add(e);
}
}
throw new AggregateException(
"Failed to establish Ignite thin client connection, examine inner exceptions for details.", errors!);
}
/// <summary>
/// Gets the next connected socket, without establishing new connections.
/// </summary>
private ClientSocket? GetNextSocketWithoutReconnect()
{
var startIdx = unchecked((int) Interlocked.Increment(ref _endPointIndex));
for (var i = 0; i < _endpoints.Count; i++)
{
var idx = Math.Abs(startIdx + i) % _endpoints.Count;
var endPoint = _endpoints[idx];
if (endPoint.Socket is { IsDisposed: false })
{
return endPoint.Socket;
}
}
return null;
}
/// <summary>
/// Connects to the given endpoint.
/// </summary>
private async ValueTask<ClientSocket> ConnectAsync(SocketEndpoint endpoint)
{
if (endpoint.Socket?.IsDisposed == false)
{
return endpoint.Socket;
}
await _socketLock.WaitAsync().ConfigureAwait(false);
if (endpoint.Socket?.IsDisposed == false)
{
return endpoint.Socket;
}
try
{
var socket = await ClientSocket.ConnectAsync(endpoint, Configuration, OnAssignmentChanged).ConfigureAwait(false);
if (_clusterId == null)
{
_clusterId = socket.ConnectionContext.ClusterId;
}
else if (_clusterId != socket.ConnectionContext.ClusterId)
{
socket.Dispose();
throw new IgniteClientConnectionException(
ErrorGroups.Client.ClusterIdMismatch,
$"Cluster ID mismatch: expected={_clusterId}, actual={socket.ConnectionContext.ClusterId}");
}
endpoint.Socket = socket;
_endpointsByName[socket.ConnectionContext.ClusterNode.Name] = endpoint;
_lastConnectedSocket = socket;
return socket;
}
finally
{
_socketLock.Release();
}
}
/// <summary>
/// Called when an assignment update is detected.
/// </summary>
/// <param name="clientSocket">Socket.</param>
private void OnAssignmentChanged(ClientSocket clientSocket)
{
// NOTE: Multiple channels will send the same update to us, resulting in multiple cache invalidations.
// This could be solved with a cluster-wide AssignmentVersion, but we don't have that.
// So we only react to updates from the last known good channel. When no user-initiated operations are performed on that
// channel, heartbeat messages will trigger updates.
if (clientSocket == _lastConnectedSocket)
{
Interlocked.Increment(ref _assignmentVersion);
}
}
/// <summary>
/// Gets the endpoints: all combinations of IP addresses and ports according to configuration.
/// </summary>
private IEnumerable<SocketEndpoint> GetIpEndPoints(IgniteClientConfiguration cfg)
{
foreach (var e in Endpoint.GetEndpoints(cfg))
{
var host = e.Host;
Debug.Assert(host != null, "host != null"); // Checked by GetEndpoints.
foreach (var ip in GetIps(host))
{
yield return new SocketEndpoint(new IPEndPoint(ip, e.Port), host);
}
}
}
/// <summary>
/// Gets IP address list from a given host.
/// When host is an IP already - parses it. Otherwise, resolves DNS name to IPs.
/// </summary>
private IEnumerable<IPAddress> GetIps(string host, bool suppressExceptions = false)
{
try
{
// GetHostEntry accepts IPs, but TryParse is a more efficient shortcut.
return IPAddress.TryParse(host, out var ip) ? new[] { ip } : Dns.GetHostEntry(host).AddressList;
}
catch (SocketException e)
{
_logger?.Debug(e, "Failed to parse host: " + host);
if (suppressExceptions)
{
return Enumerable.Empty<IPAddress>();
}
throw;
}
}
/// <summary>
/// Gets a value indicating whether a failed operation should be retried.
/// </summary>
/// <param name="exception">Exception that caused the operation to fail.</param>
/// <param name="op">Operation code.</param>
/// <param name="attempt">Current attempt.</param>
/// <param name="retryPolicy">Retry policy.</param>
/// <returns>
/// <c>true</c> if the operation should be retried on another connection, <c>false</c> otherwise.
/// </returns>
private bool ShouldRetry(Exception exception, ClientOp op, int attempt, IRetryPolicy? retryPolicy)
{
var e = exception;
while (e != null && !(e is SocketException))
{
e = e.InnerException;
}
if (e == null)
{
// Only retry socket exceptions.
return false;
}
if (retryPolicy is null or RetryNonePolicy)
{
return false;
}
var publicOpType = op.ToPublicOperationType();
if (publicOpType == null)
{
// System operation.
return true;
}
var ctx = new RetryPolicyContext(new(Configuration), publicOpType.Value, attempt, exception);
return retryPolicy.ShouldRetry(ctx);
}
/// <summary>
/// Handles operation error.
/// </summary>
/// <param name="exception">Error.</param>
/// <param name="op">Operation code.</param>
/// <param name="attempt">Current attempt.</param>
/// <param name="errors">Previous errors.</param>
/// <param name="retryPolicy">Retry policy.</param>
/// <returns>True if the error was handled, false otherwise.</returns>
[SuppressMessage("Microsoft.Design", "CA1002:DoNotExposeGenericLists", Justification = "Private.")]
private bool HandleOpError(
Exception exception,
ClientOp op,
ref int attempt,
ref List<Exception>? errors,
IRetryPolicy? retryPolicy)
{
if (!ShouldRetry(exception, op, attempt, retryPolicy))
{
if (_logger?.IsEnabled(LogLevel.Debug) == true)
{
_logger.Debug($"Not retrying operation [opCode={(int)op}, opType={op}, attempt={attempt}, lastError={exception}]");
}
if (errors == null)
{
return false;
}
errors.Add(exception);
var inner = new AggregateException(errors);
throw new IgniteClientConnectionException(
ErrorGroups.Client.Connection,
$"Operation {op} failed after {attempt} retries, examine InnerException for details.",
inner);
}
if (_logger?.IsEnabled(LogLevel.Debug) == true)
{
_logger.Debug($"Retrying operation [opCode={(int)op}, opType={op}, attempt={attempt}, lastError={exception}]");
}
Metrics.RequestsRetried.Add(1);
if (errors == null)
{
errors = new List<Exception> { exception };
}
else
{
errors.Add(exception);
}
attempt++;
return true;
}
}
}