modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs (222 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Apache.Ignite.Internal.Compute { using System; using System.Buffers.Binary; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.Linq; using System.Threading.Tasks; using Buffers; using Common; using Ignite.Compute; using Ignite.Network; using Ignite.Table; using Proto; using Proto.MsgPack; using Table; using Table.Serialization; /// <summary> /// Compute API. /// </summary> internal sealed class Compute : ICompute { /** Socket. */ private readonly ClientFailoverSocket _socket; /** Tables. */ private readonly Tables _tables; /** Cached tables. */ private readonly ConcurrentDictionary<string, Table> _tableCache = new(); /// <summary> /// Initializes a new instance of the <see cref="Compute"/> class. /// </summary> /// <param name="socket">Socket.</param> /// <param name="tables">Tables.</param> public Compute(ClientFailoverSocket socket, Tables tables) { _socket = socket; _tables = tables; } /// <inheritdoc/> public async Task<T> ExecuteAsync<T>( IEnumerable<IClusterNode> nodes, IEnumerable<DeploymentUnit> units, string jobClassName, params object?[]? args) { IgniteArgumentCheck.NotNull(nodes, nameof(nodes)); IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName)); return await ExecuteOnOneNode<T>(GetRandomNode(nodes), units, jobClassName, args).ConfigureAwait(false); } /// <inheritdoc/> public async Task<T> ExecuteColocatedAsync<T>( string tableName, IIgniteTuple key, IEnumerable<DeploymentUnit> units, string jobClassName, params object?[]? args) => await ExecuteColocatedAsync<T, IIgniteTuple>( tableName, key, serializerHandlerFunc: static _ => TupleSerializerHandler.Instance, units, jobClassName, args) .ConfigureAwait(false); /// <inheritdoc/> public async Task<T> ExecuteColocatedAsync<T, TKey>( string tableName, TKey key, IEnumerable<DeploymentUnit> units, string jobClassName, params object?[]? args) where TKey : notnull => await ExecuteColocatedAsync<T, TKey>( tableName, key, serializerHandlerFunc: table => table.GetRecordViewInternal<TKey>().RecordSerializer.Handler, units, jobClassName, args) .ConfigureAwait(false); /// <inheritdoc/> public IDictionary<IClusterNode, Task<T>> BroadcastAsync<T>( IEnumerable<IClusterNode> nodes, IEnumerable<DeploymentUnit> units, string jobClassName, params object?[]? args) { IgniteArgumentCheck.NotNull(nodes, nameof(nodes)); IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName)); IgniteArgumentCheck.NotNull(units, nameof(units)); var res = new Dictionary<IClusterNode, Task<T>>(); var units0 = units as ICollection<DeploymentUnit> ?? units.ToList(); // Avoid multiple enumeration. foreach (var node in nodes) { var task = ExecuteOnOneNode<T>(node, units0, jobClassName, args); res[node] = task; } return res; } /// <inheritdoc/> public override string ToString() => IgniteToStringBuilder.Build(GetType()); [SuppressMessage("Security", "CA5394:Do not use insecure randomness", Justification = "Secure random is not required here.")] private static IClusterNode GetRandomNode(IEnumerable<IClusterNode> nodes) { var nodesCol = GetNodesCollection(nodes); IgniteArgumentCheck.Ensure(nodesCol.Count > 0, nameof(nodes), "Nodes can't be empty."); var idx = Random.Shared.Next(0, nodesCol.Count); return nodesCol.ElementAt(idx); } private static ICollection<IClusterNode> GetNodesCollection(IEnumerable<IClusterNode> nodes) => nodes as ICollection<IClusterNode> ?? nodes.ToList(); private static void WriteUnits(IEnumerable<DeploymentUnit> units, PooledArrayBuffer buf) { var w = buf.MessageWriter; if (units.TryGetNonEnumeratedCount(out var count)) { w.WriteArrayHeader(count); foreach (var unit in units) { if (string.IsNullOrEmpty(unit.Name)) { throw new ArgumentException("Deployment unit name can't be null or empty."); } if (string.IsNullOrEmpty(unit.Version)) { throw new ArgumentException("Deployment unit version can't be null or empty."); } w.Write(unit.Name); w.Write(unit.Version); } return; } // Enumerable without known count - enumerate first, write count later. count = 0; var countSpan = buf.GetSpan(5); buf.Advance(5); foreach (var unit in units) { count++; w.Write(unit.Name); w.Write(unit.Version); } countSpan[0] = MsgPackCode.Array32; BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count); } private async Task<T> ExecuteOnOneNode<T>( IClusterNode node, IEnumerable<DeploymentUnit> units, string jobClassName, object?[]? args) { IgniteArgumentCheck.NotNull(node, nameof(node)); using var writer = ProtoCommon.GetMessageWriter(); Write(); using var res = await _socket.DoOutInOpAsync(ClientOp.ComputeExecute, writer, PreferredNode.FromName(node.Name)) .ConfigureAwait(false); return Read(res); void Write() { var w = writer.MessageWriter; w.Write(node.Name); WriteUnits(units, writer); w.Write(jobClassName); w.WriteObjectCollectionAsBinaryTuple(args); } static T Read(in PooledBuffer buf) { var reader = buf.GetReader(); return (T)reader.ReadObjectFromBinaryTuple()!; } } private async Task<Table> GetTableAsync(string tableName) { if (_tableCache.TryGetValue(tableName, out var cachedTable)) { return cachedTable; } var table = await _tables.GetTableInternalAsync(tableName).ConfigureAwait(false); if (table != null) { _tableCache[tableName] = table; return table; } _tableCache.TryRemove(tableName, out _); throw new IgniteClientException(ErrorGroups.Client.TableIdNotFound, $"Table '{tableName}' does not exist."); } private async Task<T> ExecuteColocatedAsync<T, TKey>( string tableName, TKey key, Func<Table, IRecordSerializerHandler<TKey>> serializerHandlerFunc, IEnumerable<DeploymentUnit> units, string jobClassName, params object?[]? args) where TKey : notnull { IgniteArgumentCheck.NotNull(tableName, nameof(tableName)); IgniteArgumentCheck.NotNull(key, nameof(key)); IgniteArgumentCheck.NotNull(jobClassName, nameof(jobClassName)); var units0 = units as ICollection<DeploymentUnit> ?? units.ToList(); // Avoid multiple enumeration. while (true) { var table = await GetTableAsync(tableName).ConfigureAwait(false); var schema = await table.GetLatestSchemaAsync().ConfigureAwait(false); using var bufferWriter = ProtoCommon.GetMessageWriter(); var colocationHash = Write(bufferWriter, table, schema); var preferredNode = await table.GetPreferredNode(colocationHash, null).ConfigureAwait(false); try { using var res = await _socket.DoOutInOpAsync(ClientOp.ComputeExecuteColocated, bufferWriter, preferredNode) .ConfigureAwait(false); return Read(res); } catch (IgniteException e) when (e.Code == ErrorGroups.Client.TableIdNotFound) { // Table was dropped - remove from cache. // Try again in case a new table with the same name exists. _tableCache.TryRemove(tableName, out _); } } int Write(PooledArrayBuffer bufferWriter, Table table, Schema schema) { var w = bufferWriter.MessageWriter; w.Write(table.Id); w.Write(schema.Version); var serializerHandler = serializerHandlerFunc(table); var colocationHash = serializerHandler.Write(ref w, schema, key, keyOnly: true, computeHash: true); WriteUnits(units0, bufferWriter); w.Write(jobClassName); w.WriteObjectCollectionAsBinaryTuple(args); return colocationHash; } static T Read(in PooledBuffer buf) { var reader = buf.GetReader(); _ = reader.ReadInt32(); return (T)reader.ReadObjectFromBinaryTuple()!; } } } }