src/Elastic.Transport/Components/NodePool/StaticNodePool.cs (106 lines of code) (raw):

// Licensed to Elasticsearch B.V under one or more agreements. // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information using System; using System.Collections.Generic; using System.Linq; using System.Threading; using Elastic.Transport.Diagnostics.Auditing; using Elastic.Transport.Extensions; namespace Elastic.Transport; /// <summary> /// A node pool that disables <see cref="SupportsReseeding"/> which in turn disallows the <see cref="ITransport{TConfiguration}"/> to enable sniffing to /// discover the current cluster's list of active nodes. /// <para>Therefore the nodes you supply are the list of known nodes throughout its lifetime, hence static</para> /// </summary> public class StaticNodePool : NodePool { /// <summary> /// Everytime <see cref="CreateView"/> is called it picks the initial starting point from this cursor. /// After which it uses a local cursor to commence the enumeration. This makes <see cref="CreateView"/> deterministic /// even when if multiple threads enumerate over multiple lazy collections returned by <see cref="CreateView"/>. /// </summary> protected int GlobalCursor = -1; private readonly Func<Node, float> _nodeScorer; /// <inheritdoc cref="StaticNodePool"/> public StaticNodePool(IEnumerable<Uri> uris, bool randomize = true) : this(uris.Select(uri => new Node(uri)), randomize, null) { } /// <inheritdoc cref="StaticNodePool"/> public StaticNodePool(IEnumerable<Node> nodes, bool randomize = true) : this(nodes, randomize, null) { } /// <inheritdoc cref="StaticNodePool"/> protected StaticNodePool(IEnumerable<Node> nodes, bool randomize, int? randomizeSeed = null) { Randomize = randomize; Random = !randomize || !randomizeSeed.HasValue ? new Random() : new Random(randomizeSeed.Value); Initialize(nodes); } //this constructor is protected because nodeScorer only makes sense on subclasses that support reseeding otherwise just manually sort `nodes` before instantiating. /// <inheritdoc cref="StaticNodePool"/> protected StaticNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer = null) { _nodeScorer = nodeScorer; Initialize(nodes); } private void Initialize(IEnumerable<Node> nodes) { var nodesProvided = nodes?.ToList() ?? throw new ArgumentNullException(nameof(nodes)); nodesProvided.ThrowIfEmpty(nameof(nodes)); string scheme = null; foreach (var node in nodesProvided) { if (scheme == null) { scheme = node.Uri.Scheme; UsingSsl = scheme == "https"; } else if (scheme != node.Uri.Scheme) // TODO - Diagnostic event here throw new ArgumentException("Trying to instantiate a node pool with mixed URI Schemes"); } InternalNodes = SortNodes(nodesProvided) .DistinctByCustom(n => n.Uri) .ToList(); } /// <inheritdoc /> public override DateTimeOffset? LastUpdate { get; protected set; } /// <inheritdoc /> public override int MaxRetries => InternalNodes.Count - 1; /// <inheritdoc /> public override IReadOnlyCollection<Node> Nodes => InternalNodes; /// <inheritdoc /> public override bool SupportsPinging => true; /// <inheritdoc /> public override bool SupportsReseeding => false; /// <inheritdoc /> public override bool UsingSsl { get; protected set; } /// <summary> /// A window into <see cref="InternalNodes"/> that only selects the nodes considered alive at the time of calling /// this property. Taking into account <see cref="DateTimeProvider.Now"/> and <see cref="Node.DeadUntil"/> /// </summary> protected IReadOnlyList<Node> AliveNodes { get { var now = DateTimeProvider.Now(); return InternalNodes .Where(n => n.IsAlive || n.DeadUntil <= now) .ToList(); } } /// <summary> /// The list of nodes we are operating over. This is protected so that subclasses that DO implement <see cref="SupportsReseeding"/> /// can update this list. Its up to subclasses to make this thread safe. /// </summary> protected List<Node> InternalNodes { get; set; } /// <summary> /// If <see cref="Randomize"/> is set sub classes that support reseeding will have to use this instance since it might be based of an /// explicit seed passed into the constructor. /// </summary> // ReSharper disable once MemberCanBePrivate.Global protected Random Random { get; } /// <summary> Whether the nodes order should be randomized after sniffing </summary> // ReSharper disable once MemberCanBePrivate.Global protected bool Randomize { get; } /// <summary> /// Creates a view of all the live nodes with changing starting positions that wraps over on each call /// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1. /// if there are no live nodes yields a different dead node to try once /// </summary> public override IEnumerable<Node> CreateView(Auditor? auditor) { var nodes = AliveNodes; var globalCursor = Interlocked.Increment(ref GlobalCursor); if (nodes.Count == 0) { //could not find a suitable node retrying on first node off globalCursor yield return RetryInternalNodes(globalCursor, auditor); yield break; } var localCursor = globalCursor % nodes.Count; foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, auditor)) yield return aliveNode; } /// <inheritdoc /> public override void Reseed(IEnumerable<Node> nodes) { } //ignored /// <summary> /// If no active nodes are found this method can be used by subclasses to reactivate the next node based on /// <paramref name="globalCursor"/> /// </summary> /// <param name="globalCursor"></param> /// <param name="auditor">Trace action to document the fact all nodes were dead and were resurrecting one at random</param> protected Node RetryInternalNodes(int globalCursor, Auditor? auditor = null) { auditor?.Emit(AuditEvent.AllNodesDead); var node = InternalNodes[globalCursor % InternalNodes.Count]; node.IsResurrected = true; auditor?.Emit(AuditEvent.Resurrection, node); return node; } /// <summary> /// Lazy enumerate <paramref name="aliveNodes"/> based on the local <paramref name="cursor"/>. Enumeration will start from <paramref name="cursor"/> /// and loop around the end and stop before hitting <paramref name="cursor"/> again. This ensures all nodes are attempted. /// </summary> /// <param name="cursor">The starting point into <paramref name="aliveNodes"/> from wich to start.</param> /// <param name="aliveNodes"></param> /// <param name="auditor">Trace action to notify if a resurrection occured</param> protected static IEnumerable<Node> SelectAliveNodes(int cursor, IReadOnlyList<Node> aliveNodes, Auditor? auditor = null) { // ReSharper disable once ForCanBeConvertedToForeach for (var attempts = 0; attempts < aliveNodes.Count; attempts++) { var node = aliveNodes[cursor]; cursor = (cursor + 1) % aliveNodes.Count; //if this node is not alive or no longer dead mark it as resurrected if (!node.IsAlive) { auditor?.Emit(AuditEvent.Resurrection, node); node.IsResurrected = true; } yield return node; } } /// <summary> /// Provides the default sort order for <see cref="CreateView"/> this takes into account whether a subclass injected a custom <see cref="Node"/> comparer /// and if not whether <see cref="Randomize"/> is set /// </summary> protected IOrderedEnumerable<Node> SortNodes(IEnumerable<Node> nodes) => _nodeScorer != null ? nodes.OrderByDescending(_nodeScorer) : nodes.OrderBy(n => Randomize ? Random.Next() : 1); }