modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs (242 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace Apache.Ignite.Internal.Table { using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Buffers; using Common; using Ignite.Sql; using Ignite.Table; using Ignite.Transactions; using Log; using Proto; using Proto.MsgPack; using Serialization; using Sql; /// <summary> /// Table API. /// </summary> internal sealed class Table : ITable { /** Unknown schema version. */ private const int UnknownSchemaVersion = -1; /** Socket. */ private readonly ClientFailoverSocket _socket; /** SQL. */ private readonly Sql _sql; /** Schemas. */ private readonly ConcurrentDictionary<int, Task<Schema>> _schemas = new(); /** Cached record views. */ private readonly ConcurrentDictionary<Type, object> _recordViews = new(); /** */ private readonly object _latestSchemaLock = new(); /** */ private readonly IIgniteLogger? _logger; /** */ private readonly SemaphoreSlim _partitionAssignmentSemaphore = new(1); /** */ private volatile int _latestSchemaVersion = UnknownSchemaVersion; /** */ private volatile int _partitionAssignmentVersion = -1; /** */ private volatile string[]? _partitionAssignment; /// <summary> /// Initializes a new instance of the <see cref="Table"/> class. /// </summary> /// <param name="name">Table name.</param> /// <param name="id">Table id.</param> /// <param name="socket">Socket.</param> /// <param name="sql">SQL.</param> public Table(string name, int id, ClientFailoverSocket socket, Sql sql) { _socket = socket; _sql = sql; Name = name; Id = id; _logger = socket.Configuration.Logger.GetLogger(GetType()); RecordBinaryView = new RecordView<IIgniteTuple>( this, new RecordSerializer<IIgniteTuple>(this, TupleSerializerHandler.Instance), _sql); // RecordView and KeyValueView are symmetric and perform the same operations on the protocol level. // Only serialization is different - KeyValueView splits records into two parts. // Therefore, KeyValueView below simply delegates to RecordView<KvPair>, // and SerializerHandler writes KV pair as a single record and reads back record as two parts. var pairSerializer = new RecordSerializer<KvPair<IIgniteTuple, IIgniteTuple>>(this, TuplePairSerializerHandler.Instance); KeyValueBinaryView = new KeyValueView<IIgniteTuple, IIgniteTuple>( new RecordView<KvPair<IIgniteTuple, IIgniteTuple>>(this, pairSerializer, _sql)); } /// <inheritdoc/> public string Name { get; } /// <inheritdoc/> public IRecordView<IIgniteTuple> RecordBinaryView { get; } /// <inheritdoc/> public IKeyValueView<IIgniteTuple, IIgniteTuple> KeyValueBinaryView { get; } /// <summary> /// Gets the associated socket. /// </summary> internal ClientFailoverSocket Socket => _socket; /// <summary> /// Gets the table id. /// </summary> internal int Id { get; } /// <inheritdoc/> public IRecordView<T> GetRecordView<T>() where T : notnull => GetRecordViewInternal<T>(); /// <inheritdoc/> public IKeyValueView<TK, TV> GetKeyValueView<TK, TV>() where TK : notnull where TV : notnull => new KeyValueView<TK, TV>(GetRecordViewInternal<KvPair<TK, TV>>()); /// <inheritdoc/> public override string ToString() => new IgniteToStringBuilder(GetType()) .Append(Name) .Append(Id) .Build(); /// <summary> /// Gets the record view for the specified type. /// </summary> /// <typeparam name="T">Record type.</typeparam> /// <returns>Record view.</returns> internal RecordView<T> GetRecordViewInternal<T>() where T : notnull { // ReSharper disable once HeapView.CanAvoidClosure (generics prevent this) return (RecordView<T>)_recordViews.GetOrAdd( typeof(T), _ => new RecordView<T>(this, new RecordSerializer<T>(this, new ObjectSerializerHandler<T>()), _sql)); } /// <summary> /// Reads the schema. /// </summary> /// <param name="buf">Buffer.</param> /// <returns>Schema or null.</returns> internal Task<Schema> ReadSchemaAsync(PooledBuffer buf) { var version = buf.GetReader().ReadInt32(); return GetCachedSchemaAsync(version); } /// <summary> /// Gets the latest schema. /// </summary> /// <returns>Schema.</returns> internal Task<Schema> GetLatestSchemaAsync() { // _latestSchemaVersion can be -1 (unknown) or a valid version. // In case of unknown version, we request latest from the server and cache it with -1 key // to avoid duplicate requests for latest schema. return GetCachedSchemaAsync(_latestSchemaVersion); } /// <summary> /// Gets the preferred node by colocation hash. /// </summary> /// <param name="colocationHash">Colocation hash.</param> /// <param name="transaction">Transaction.</param> /// <returns>Preferred node.</returns> internal async ValueTask<PreferredNode> GetPreferredNode(int colocationHash, ITransaction? transaction) { if (transaction != null) { return default; } var assignment = await GetPartitionAssignmentAsync().ConfigureAwait(false); if (assignment == null || assignment.Length == 0) { // Happens on table drop. return default; } var partition = Math.Abs(colocationHash % assignment.Length); var nodeConsistentId = assignment[partition]; return PreferredNode.FromName(nodeConsistentId); } /// <summary> /// Gets the partition assignment. /// </summary> /// <returns>Partition assignment.</returns> internal async ValueTask<string[]?> GetPartitionAssignmentAsync() { var socketVer = _socket.PartitionAssignmentVersion; var assignment = _partitionAssignment; // Async double-checked locking. Assignment changes rarely, so we avoid the lock if possible. if (_partitionAssignmentVersion == socketVer && assignment != null) { return assignment; } await _partitionAssignmentSemaphore.WaitAsync().ConfigureAwait(false); try { socketVer = _socket.PartitionAssignmentVersion; assignment = _partitionAssignment; if (_partitionAssignmentVersion == socketVer && assignment != null) { return assignment; } assignment = await LoadPartitionAssignmentAsync().ConfigureAwait(false); _partitionAssignment = assignment; _partitionAssignmentVersion = socketVer; return assignment; } finally { _partitionAssignmentSemaphore.Release(); } } private Task<Schema> GetCachedSchemaAsync(int version) { var task = GetOrAdd(); if (!task.IsFaulted) { return task; } // Do not return failed task. Remove it from the cache and try again. _schemas.TryRemove(new KeyValuePair<int, Task<Schema>>(version, task)); return GetOrAdd(); Task<Schema> GetOrAdd() => _schemas.GetOrAdd(version, static (ver, tbl) => tbl.LoadSchemaAsync(ver), this); } /// <summary> /// Loads the schema. /// </summary> /// <param name="version">Version.</param> /// <returns>Schema.</returns> private async Task<Schema> LoadSchemaAsync(int version) { using var writer = ProtoCommon.GetMessageWriter(); Write(); using var resBuf = await _socket.DoOutInOpAsync(ClientOp.SchemasGet, writer).ConfigureAwait(false); return Read(); void Write() { var w = writer.MessageWriter; w.Write(Id); if (version == UnknownSchemaVersion) { w.WriteNil(); } else { w.WriteArrayHeader(1); w.Write(version); } } Schema Read() { var r = resBuf.GetReader(); var schemaCount = r.ReadMapHeader(); if (schemaCount == 0) { throw new IgniteClientException(ErrorGroups.Client.Protocol, "Schema not found: " + version); } Schema last = null!; for (var i = 0; i < schemaCount; i++) { last = ReadSchema(ref r); } // Store all schemas in the map, and return last. return last; } } /// <summary> /// Reads the schema. /// </summary> /// <param name="r">Reader.</param> /// <returns>Schema.</returns> private Schema ReadSchema(ref MsgPackReader r) { var schemaVersion = r.ReadInt32(); var columnCount = r.ReadArrayHeader(); var keyColumnCount = 0; var colocationColumnCount = 0; var columns = new Column[columnCount]; for (var i = 0; i < columnCount; i++) { var propertyCount = r.ReadArrayHeader(); const int expectedCount = 7; Debug.Assert(propertyCount >= expectedCount, "propertyCount >= " + expectedCount); var name = r.ReadString(); var type = r.ReadInt32(); var isKey = r.ReadBoolean(); var isNullable = r.ReadBoolean(); var colocationIndex = r.ReadInt32(); var scale = r.ReadInt32(); var precision = r.ReadInt32(); r.Skip(propertyCount - expectedCount); var column = new Column(name, (ColumnType)type, isNullable, isKey, colocationIndex, i, scale, precision); columns[i] = column; if (isKey) { keyColumnCount++; } if (colocationIndex >= 0) { colocationColumnCount++; } } var schema = new Schema( Version: schemaVersion, TableId: Id, KeyColumnCount: keyColumnCount, ColocationColumnCount: colocationColumnCount, Columns: columns); _schemas[schemaVersion] = Task.FromResult(schema); if (_logger?.IsEnabled(LogLevel.Debug) == true) { _logger.Debug($"Schema loaded [tableId={Id}, schemaVersion={schema.Version}]"); } lock (_latestSchemaLock) { if (schemaVersion > _latestSchemaVersion) { _latestSchemaVersion = schemaVersion; } } return schema; } /// <summary> /// Loads the partition assignment. /// </summary> /// <returns>Partition assignment.</returns> private async Task<string[]?> LoadPartitionAssignmentAsync() { using var writer = ProtoCommon.GetMessageWriter(); writer.MessageWriter.Write(Id); using var resBuf = await _socket.DoOutInOpAsync(ClientOp.PartitionAssignmentGet, writer).ConfigureAwait(false); return Read(); string[]? Read() { var r = resBuf.GetReader(); var count = r.ReadArrayHeader(); if (count == 0) { return null; } var res = new string[count]; for (int i = 0; i < count; i++) { res[i] = r.ReadString(); } return res; } } } }