modules/platforms/dotnet/Apache.Ignite/Internal/Proto/BinaryTuple/BinaryTupleBuilder.cs (1,023 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Internal.Proto.BinaryTuple
{
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Diagnostics;
using System.Numerics;
using System.Runtime.InteropServices;
using Buffers;
using Ignite.Sql;
using NodaTime;
using Table;
/// <summary>
/// Binary tuple builder.
/// </summary>
internal ref struct BinaryTupleBuilder
{
/** Number of elements in the tuple. */
private readonly int _numElements;
/** Size of an offset table entry. */
private readonly int _entrySize;
/** Position of the varlen offset table. */
private readonly int _entryBase;
/** Starting position of variable-length values. */
private readonly int _valueBase;
/** Colocation column index provider. When not null, used to compute colocation hash on the fly. */
private readonly IHashedColumnIndexProvider? _hashedColumnsPredicate;
/** Buffer for tuple content. */
private readonly PooledArrayBuffer _buffer;
/** Current element. */
private int _elementIndex;
/// <summary>
/// Initializes a new instance of the <see cref="BinaryTupleBuilder"/> struct.
/// </summary>
/// <param name="numElements">Capacity.</param>
/// <param name="totalValueSize">Total value size, -1 when unknown.</param>
/// <param name="hashedColumnsPredicate">A predicate that returns true for colocation column indexes.
/// Pass null when colocation hash is not needed.</param>
public BinaryTupleBuilder(
int numElements,
int totalValueSize = -1,
IHashedColumnIndexProvider? hashedColumnsPredicate = null)
{
Debug.Assert(numElements >= 0, "numElements >= 0");
_numElements = numElements;
_hashedColumnsPredicate = hashedColumnsPredicate;
_buffer = new();
_elementIndex = 0;
// Reserve buffer for individual hash codes.
_entryBase = _hashedColumnsPredicate != null
? BinaryTupleCommon.HeaderSize + _hashedColumnsPredicate.HashedColumnCount * 4
: BinaryTupleCommon.HeaderSize;
_entrySize = totalValueSize < 0
? 4
: BinaryTupleCommon.FlagsToEntrySize(BinaryTupleCommon.ValueSizeToFlags(totalValueSize));
_valueBase = _entryBase + _entrySize * numElements;
_buffer.GetSpan(sizeHint: _valueBase)[.._valueBase].Clear();
_buffer.Advance(_valueBase);
}
/// <summary>
/// Gets the current element index.
/// </summary>
public int ElementIndex => _elementIndex;
/// <summary>
/// Gets the hash from column values according to specified <see cref="IHashedColumnIndexProvider"/>.
/// </summary>
/// <returns>Column hash according to specified <see cref="IHashedColumnIndexProvider"/>.</returns>
public int GetHash()
{
if (_hashedColumnsPredicate == null)
{
return 0;
}
var hash = 0;
var hashes = GetHashSpan();
for (var i = 0; i < _hashedColumnsPredicate.HashedColumnCount; i++)
{
var colHash = hashes[i];
hash = HashUtils.Combine(hash, colHash);
}
return hash;
}
/// <summary>
/// Appends a null value.
/// </summary>
public void AppendNull()
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32((sbyte)0));
}
OnWrite();
}
/// <summary>
/// Appends a byte.
/// </summary>
/// <param name="value">Value.</param>
public void AppendByte(sbyte value)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value));
}
PutByte(value);
OnWrite();
}
/// <summary>
/// Appends a byte.
/// </summary>
/// <param name="value">Value.</param>
public void AppendByteNullable(sbyte? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendByte(value.Value);
}
}
/// <summary>
/// Appends a bool.
/// </summary>
/// <param name="value">Value.</param>
public void AppendBool(bool value)
{
var v = BinaryTupleCommon.BoolToByte(value);
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(v));
}
PutByte(v);
OnWrite();
}
/// <summary>
/// Appends a bool.
/// </summary>
/// <param name="value">Value.</param>
public void AppendBoolNullable(bool? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendBool(value.Value);
}
}
/// <summary>
/// Appends a short.
/// </summary>
/// <param name="value">Value.</param>
public void AppendShort(short value)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value));
}
if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
{
PutByte((sbyte)value);
}
else
{
PutShort(value);
}
OnWrite();
}
/// <summary>
/// Appends a short.
/// </summary>
/// <param name="value">Value.</param>
public void AppendShortNullable(short? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendShort(value.Value);
}
}
/// <summary>
/// Appends an int.
/// </summary>
/// <param name="value">Value.</param>
public void AppendInt(int value)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value));
}
if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
{
PutByte((sbyte)value);
}
else if (value >= short.MinValue && value <= short.MaxValue)
{
PutShort((short)value);
}
else
{
PutInt(value);
}
OnWrite();
}
/// <summary>
/// Appends an int.
/// </summary>
/// <param name="value">Value.</param>
public void AppendIntNullable(int? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendInt(value.Value);
}
}
/// <summary>
/// Appends a long.
/// </summary>
/// <param name="value">Value.</param>
public void AppendLong(long value)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value));
}
if (value >= sbyte.MinValue && value <= sbyte.MaxValue)
{
PutByte((sbyte)value);
}
else if (value >= short.MinValue && value <= short.MaxValue)
{
PutShort((short)value);
}
else if (value >= int.MinValue && value <= int.MaxValue)
{
PutInt((int)value);
}
else
{
PutLong(value);
}
OnWrite();
}
/// <summary>
/// Appends a long.
/// </summary>
/// <param name="value">Value.</param>
public void AppendLongNullable(long? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendLong(value.Value);
}
}
/// <summary>
/// Appends a gloat.
/// </summary>
/// <param name="value">Value.</param>
public void AppendFloat(float value)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value));
}
PutFloat(value);
OnWrite();
}
/// <summary>
/// Appends a gloat.
/// </summary>
/// <param name="value">Value.</param>
public void AppendFloatNullable(float? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendFloat(value.Value);
}
}
/// <summary>
/// Appends a double.
/// </summary>
/// <param name="value">Value.</param>
public void AppendDouble(double value)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value));
}
// ReSharper disable once CompareOfFloatsByEqualityOperator
if (value == (float)value)
{
PutFloat((float)value);
}
else
{
PutDouble(value);
}
OnWrite();
}
/// <summary>
/// Appends a double.
/// </summary>
/// <param name="value">Value.</param>
public void AppendDoubleNullable(double? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendDouble(value.Value);
}
}
/// <summary>
/// Appends a string.
/// </summary>
/// <param name="value">Value.</param>
public void AppendString(string value)
{
PutString(value);
OnWrite();
}
/// <summary>
/// Appends a string.
/// </summary>
/// <param name="value">Value.</param>
public void AppendStringNullable(string? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendString(value);
}
}
/// <summary>
/// Appends bytes.
/// </summary>
/// <param name="value">Value.</param>
public void AppendBytes(ReadOnlySpan<byte> value)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value));
}
PutBytes(value);
OnWrite();
}
/// <summary>
/// Appends bytes using <see cref="IBufferWriter{T}"/> directly to the underlying buffer, avoiding extra copying.
/// </summary>
/// <param name="action">Appender action.</param>
/// <param name="arg">Argument.</param>
/// <typeparam name="TArg">Argument type.</typeparam>
public void AppendBytes<TArg>(Action<IBufferWriter<byte>, TArg> action, TArg arg)
{
var oldPos = _buffer.Position;
action(_buffer, arg);
var length = _buffer.Position - oldPos;
if (length == 0)
{
GetSpan(1)[0] = BinaryTupleCommon.VarlenEmptyByte;
OnWrite();
return;
}
var writtenSpan = _buffer.GetWrittenMemory().Span.Slice(oldPos, length);
if (length > 0 && writtenSpan[0] == BinaryTupleCommon.VarlenEmptyByte)
{
// Actual data starts with VarlenEmptyByte - insert another VarlenEmptyByte in the beginning.
var temp = ByteArrayPool.Rent(length);
try
{
// 1. Copy written memory to a separate buffer.
writtenSpan.CopyTo(temp);
// 2. Extend the buffer.
_buffer.GetSpanAndAdvance(1);
// 3. Copy back, skipping existing VarlenEmptyByte at the beginning.
var newWrittenSpan = _buffer.GetWrittenMemory().Span.Slice(oldPos + 1, length);
temp.AsSpan(0, length).CopyTo(newWrittenSpan);
}
finally
{
ByteArrayPool.Return(temp);
}
}
OnWrite();
}
/// <summary>
/// Appends bytes.
/// </summary>
/// <param name="value">Value.</param>
public void AppendBytes(byte[] value) => AppendBytes(value.AsSpan());
/// <summary>
/// Appends bytes.
/// </summary>
/// <param name="value">Value.</param>
public void AppendBytesNullable(byte[]? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendBytes(value);
}
}
/// <summary>
/// Appends a guid.
/// </summary>
/// <param name="value">Value.</param>
public void AppendGuid(Guid value)
{
var span = GetSpan(16);
UuidSerializer.Write(value, span);
if (GetHashOrder() is { } hashOrder)
{
var lo = BinaryPrimitives.ReadInt64LittleEndian(span[..8]);
var hi = BinaryPrimitives.ReadInt64LittleEndian(span[8..]);
var hash = HashUtils.Hash32(hi, HashUtils.Hash32(lo));
PutHash(hashOrder, hash);
}
OnWrite();
}
/// <summary>
/// Appends a guid.
/// </summary>
/// <param name="value">Value.</param>
public void AppendGuidNullable(Guid? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendGuid(value.Value);
}
}
/// <summary>
/// Appends a big decimal.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="scale">Decimal scale from schema.</param>
public void AppendBigDecimal(BigDecimal value, int scale)
{
var valueScale = value.Scale;
var unscaledValue = value.UnscaledValue;
if (valueScale > scale)
{
unscaledValue /= BigInteger.Pow(10, valueScale - scale);
valueScale = (short)scale;
}
PutShort(valueScale);
PutNumber(unscaledValue);
}
/// <summary>
/// Appends a nullable big decimal.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="scale">Decimal scale from schema.</param>
public void AppendBigDecimalNullable(BigDecimal? value, int scale)
{
if (value == null)
{
AppendNull();
}
else
{
AppendBigDecimal(value.Value, scale);
}
}
/// <summary>
/// Appends a decimal.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="scale">Decimal scale from schema.</param>
public void AppendDecimal(decimal value, int scale) =>
AppendBigDecimal(new BigDecimal(value), scale);
/// <summary>
/// Appends a nullable decimal.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="scale">Decimal scale from schema.</param>
public void AppendDecimalNullable(decimal? value, int scale) =>
AppendBigDecimalNullable(value == null ? null : new BigDecimal(value.Value), scale);
/// <summary>
/// Appends a date.
/// </summary>
/// <param name="value">Value.</param>
public void AppendDate(LocalDate value)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value));
}
PutDate(value);
OnWrite();
}
/// <summary>
/// Appends a date.
/// </summary>
/// <param name="value">Value.</param>
public void AppendDateNullable(LocalDate? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendDate(value.Value);
}
}
/// <summary>
/// Appends a time.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="precision">Precision.</param>
public void AppendTime(LocalTime value, int precision)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value, precision));
}
PutTime(value, precision);
OnWrite();
}
/// <summary>
/// Appends a time.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="precision">Precision.</param>
public void AppendTimeNullable(LocalTime? value, int precision)
{
if (value == null)
{
AppendNull();
}
else
{
AppendTime(value.Value, precision);
}
}
/// <summary>
/// Appends a date and time.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="precision">Precision.</param>
public void AppendDateTime(LocalDateTime value, int precision)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(value, precision));
}
PutDate(value.Date);
PutTime(value.TimeOfDay, precision);
OnWrite();
}
/// <summary>
/// Appends a date and time.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="precision">Precision.</param>
public void AppendDateTimeNullable(LocalDateTime? value, int precision)
{
if (value == null)
{
AppendNull();
}
else
{
AppendDateTime(value.Value, precision);
}
}
/// <summary>
/// Appends a timestamp (instant).
/// </summary>
/// <param name="value">Value.</param>
/// <param name="precision">Precision.</param>
public void AppendTimestamp(Instant value, int precision)
{
var (seconds, nanos) = PutTimestamp(value, precision);
if (GetHashOrder() is { } hashOrder)
{
var hash = HashUtils.Hash32(nanos, HashUtils.Hash32(seconds));
PutHash(hashOrder, hash);
}
OnWrite();
}
/// <summary>
/// Appends a timestamp (instant).
/// </summary>
/// <param name="value">Value.</param>
/// <param name="precision">Precision.</param>
public void AppendTimestampNullable(Instant? value, int precision)
{
if (value == null)
{
AppendNull();
}
else
{
AppendTimestamp(value.Value, precision);
}
}
/// <summary>
/// Appends a duration.
/// </summary>
/// <param name="value">Value.</param>
public void AppendDuration(Duration value)
{
if (GetHashOrder() is not null)
{
// Colocation keys can't include Duration.
throw new NotSupportedException("Duration hashing is not supported.");
}
PutDuration(value);
OnWrite();
}
/// <summary>
/// Appends a duration.
/// </summary>
/// <param name="value">Value.</param>
public void AppendDurationNullable(Duration? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendDuration(value.Value);
}
}
/// <summary>
/// Appends a period.
/// </summary>
/// <param name="value">Value.</param>
public void AppendPeriod(Period value)
{
if (GetHashOrder() is not null)
{
// Colocation keys can't include Period.
throw new NotSupportedException("Period hashing is not supported.");
}
PutPeriod(value);
OnWrite();
}
/// <summary>
/// Appends a period.
/// </summary>
/// <param name="value">Value.</param>
public void AppendPeriodNullable(Period? value)
{
if (value == null)
{
AppendNull();
}
else
{
AppendPeriod(value);
}
}
/// <summary>
/// Appends an object.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="colType">Column type.</param>
/// <param name="scale">Decimal scale.</param>
/// <param name="precision">Precision.</param>
public void AppendObject(object? value, ColumnType colType, int scale = 0, int precision = TemporalTypes.MaxTimePrecision)
{
if (value == null)
{
AppendNull();
return;
}
switch (colType)
{
case ColumnType.Int8:
AppendByte((sbyte)value);
break;
case ColumnType.Int16:
AppendShort((short)value);
break;
case ColumnType.Int32:
AppendInt((int)value);
break;
case ColumnType.Int64:
AppendLong((long)value);
break;
case ColumnType.Float:
AppendFloat((float)value);
break;
case ColumnType.Double:
AppendDouble((double)value);
break;
case ColumnType.Uuid:
AppendGuid((Guid)value);
break;
case ColumnType.String:
AppendString((string)value);
break;
case ColumnType.ByteArray:
AppendBytes((byte[])value);
break;
case ColumnType.Decimal:
if (value is decimal dec)
{
AppendDecimal(dec, scale);
}
else
{
AppendBigDecimal((BigDecimal)value, scale);
}
break;
case ColumnType.Date:
AppendDate((LocalDate)value);
break;
case ColumnType.Time:
AppendTime((LocalTime)value, precision);
break;
case ColumnType.Datetime:
AppendDateTime((LocalDateTime)value, precision);
break;
case ColumnType.Timestamp:
AppendTimestamp((Instant)value, precision);
break;
case ColumnType.Boolean:
AppendBool((bool)value);
break;
default:
throw new IgniteClientException(ErrorGroups.Client.Protocol, "Unsupported type: " + colType);
}
}
/// <summary>
/// Appends an object.
/// </summary>
/// <param name="value">Value.</param>
/// <param name="timePrecision">Time precision.</param>
/// <param name="timestampPrecision">Timestamp precision.</param>
public void AppendObjectWithType(
object? value,
int timePrecision = TemporalTypes.MaxTimePrecision,
int timestampPrecision = TemporalTypes.MaxTimePrecision)
{
switch (value)
{
case null:
AppendNull(); // Type.
AppendNull(); // Scale.
AppendNull(); // Value.
break;
case bool b:
AppendTypeAndScale(ColumnType.Boolean);
AppendBool(b);
break;
case int i32:
AppendTypeAndScale(ColumnType.Int32);
AppendInt(i32);
break;
case long i64:
AppendTypeAndScale(ColumnType.Int64);
AppendLong(i64);
break;
case string str:
AppendTypeAndScale(ColumnType.String);
AppendString(str);
break;
case Guid uuid:
AppendTypeAndScale(ColumnType.Uuid);
AppendGuid(uuid);
break;
case sbyte i8:
AppendTypeAndScale(ColumnType.Int8);
AppendByte(i8);
break;
case short i16:
AppendTypeAndScale(ColumnType.Int16);
AppendShort(i16);
break;
case float f32:
AppendTypeAndScale(ColumnType.Float);
AppendFloat(f32);
break;
case double f64:
AppendTypeAndScale(ColumnType.Double);
AppendDouble(f64);
break;
case byte[] bytes:
AppendTypeAndScale(ColumnType.ByteArray);
AppendBytes(bytes);
break;
case decimal dec:
var bigDec0 = new BigDecimal(dec);
AppendTypeAndScale(ColumnType.Decimal, bigDec0.Scale);
AppendBigDecimal(bigDec0, bigDec0.Scale);
break;
case BigDecimal bigDec:
AppendTypeAndScale(ColumnType.Decimal, bigDec.Scale);
AppendBigDecimal(bigDec, bigDec.Scale);
break;
case LocalDate localDate:
AppendTypeAndScale(ColumnType.Date);
AppendDate(localDate);
break;
case LocalTime localTime:
AppendTypeAndScale(ColumnType.Time);
AppendTime(localTime, timePrecision);
break;
case LocalDateTime localDateTime:
AppendTypeAndScale(ColumnType.Datetime);
AppendDateTime(localDateTime, timePrecision);
break;
case Instant instant:
AppendTypeAndScale(ColumnType.Timestamp);
AppendTimestamp(instant, timestampPrecision);
break;
case Period period:
AppendTypeAndScale(ColumnType.Period);
AppendPeriod(period);
break;
case Duration duration:
AppendTypeAndScale(ColumnType.Duration);
AppendDuration(duration);
break;
default:
throw new IgniteClientException(ErrorGroups.Client.Protocol, "Unsupported type: " + value.GetType());
}
}
/// <summary>
/// Appends an object.
/// </summary>
/// <param name="collection">Value.</param>
/// <typeparam name="T">Element type.</typeparam>
public void AppendObjectCollectionWithType<T>(Span<T> collection)
{
var firstValue = collection[0];
switch (firstValue)
{
case bool:
AppendTypeAndSize(ColumnType.Boolean, collection.Length);
foreach (var item in collection)
{
AppendBool((bool)(object)item!);
}
break;
case int:
AppendTypeAndSize(ColumnType.Int32, collection.Length);
foreach (var item in collection)
{
AppendInt((int)(object)item!);
}
break;
case long:
AppendTypeAndSize(ColumnType.Int64, collection.Length);
foreach (var item in collection)
{
AppendLong((long)(object)item!);
}
break;
case string:
AppendTypeAndSize(ColumnType.String, collection.Length);
foreach (var item in collection)
{
AppendString((string)(object)item!);
}
break;
case Guid:
AppendTypeAndSize(ColumnType.Uuid, collection.Length);
foreach (var item in collection)
{
AppendGuid((Guid)(object)item!);
}
break;
case sbyte:
AppendTypeAndSize(ColumnType.Int8, collection.Length);
foreach (var item in collection)
{
AppendByte((sbyte)(object)item!);
}
break;
case short:
AppendTypeAndSize(ColumnType.Int16, collection.Length);
foreach (var item in collection)
{
AppendShort((short)(object)item!);
}
break;
case float:
AppendTypeAndSize(ColumnType.Float, collection.Length);
foreach (var item in collection)
{
AppendFloat((float)(object)item!);
}
break;
case double:
AppendTypeAndSize(ColumnType.Double, collection.Length);
foreach (var item in collection)
{
AppendDouble((double)(object)item!);
}
break;
case (byte[]):
AppendTypeAndSize(ColumnType.ByteArray, collection.Length);
foreach (var item in collection)
{
AppendBytes((byte[])(object)item!);
}
break;
case decimal:
AppendTypeAndSize(ColumnType.Decimal, collection.Length);
foreach (var item in collection)
{
AppendDecimal((decimal)(object)item!, int.MaxValue);
}
break;
case BigDecimal:
AppendTypeAndSize(ColumnType.Decimal, collection.Length);
foreach (var item in collection)
{
AppendBigDecimal((BigDecimal)(object)item!, int.MaxValue);
}
break;
case LocalDate:
AppendTypeAndSize(ColumnType.Date, collection.Length);
foreach (var item in collection)
{
AppendDate((LocalDate)(object)item!);
}
break;
case LocalTime:
AppendTypeAndSize(ColumnType.Time, collection.Length);
foreach (var item in collection)
{
AppendTime((LocalTime)(object)item!, TemporalTypes.MaxTimePrecision);
}
break;
case LocalDateTime:
AppendTypeAndSize(ColumnType.Datetime, collection.Length);
foreach (var item in collection)
{
AppendDateTime((LocalDateTime)(object)item!, TemporalTypes.MaxTimePrecision);
}
break;
case Instant:
AppendTypeAndSize(ColumnType.Timestamp, collection.Length);
foreach (var item in collection)
{
AppendTimestamp((Instant)(object)item!, TemporalTypes.MaxTimePrecision);
}
break;
case Period:
AppendTypeAndSize(ColumnType.Period, collection.Length);
foreach (var item in collection)
{
AppendPeriod((Period)(object)item!);
}
break;
case Duration:
AppendTypeAndSize(ColumnType.Duration, collection.Length);
foreach (var item in collection)
{
AppendDuration((Duration)(object)item!);
}
break;
default:
throw new IgniteClientException(ErrorGroups.Client.Protocol, "Unsupported type: " + firstValue?.GetType());
}
}
/// <summary>
/// Builds the tuple.
/// <para />
/// NOTE: This should be called only once as it messes up with accumulated internal data.
/// </summary>
/// <returns>Resulting memory.</returns>
public Memory<byte> Build()
{
int baseOffset = _entryBase - BinaryTupleCommon.HeaderSize;
int offset = baseOffset;
int valueSize = _buffer.Position - _valueBase;
byte flags = BinaryTupleCommon.ValueSizeToFlags(valueSize);
int desiredEntrySize = BinaryTupleCommon.FlagsToEntrySize(flags);
// Shrink the offset table if needed.
if (desiredEntrySize != _entrySize)
{
if (desiredEntrySize > _entrySize)
{
throw new InvalidOperationException("Offset entry overflow in binary tuple builder");
}
Debug.Assert(_entrySize == 4 || _entrySize == 2, "_entrySize == 4 || _entrySize == 2");
Debug.Assert(desiredEntrySize == 2 || desiredEntrySize == 1, "desiredEntrySize == 2 || desiredEntrySize == 1");
int getIndex = _valueBase;
int putIndex = _valueBase;
while (getIndex > _entryBase)
{
getIndex -= _entrySize;
putIndex -= desiredEntrySize;
var value = _entrySize == 4
? _buffer.ReadInt(getIndex)
: _buffer.ReadShort(getIndex);
if (desiredEntrySize == 1)
{
_buffer.WriteByte((byte)value, putIndex);
}
else
{
_buffer.WriteShort((short)value, putIndex);
}
}
offset = baseOffset + (_entrySize - desiredEntrySize) * _numElements;
}
_buffer.WriteByte(flags, offset);
return _buffer.GetWrittenMemory().Slice(offset);
}
/// <summary>
/// Disposes this instance.
/// </summary>
public void Dispose()
{
_buffer.Dispose();
}
private void PutByte(sbyte value) => _buffer.WriteByte(unchecked((byte)value));
private void PutShort(short value) => _buffer.WriteShort(value);
private void PutInt(int value) => _buffer.WriteInt(value);
private void PutLong(long value) => _buffer.WriteLong(value);
private void PutFloat(float value) => PutInt(BitConverter.SingleToInt32Bits(value));
private void PutDouble(double value) => PutLong(BitConverter.DoubleToInt64Bits(value));
private void PutBytes(ReadOnlySpan<byte> bytes)
{
if (bytes.Length == 0)
{
GetSpan(1)[0] = BinaryTupleCommon.VarlenEmptyByte;
}
else if (bytes[0] == BinaryTupleCommon.VarlenEmptyByte)
{
var span = GetSpan(bytes.Length + 1);
span[0] = BinaryTupleCommon.VarlenEmptyByte;
bytes.CopyTo(span[1..]);
}
else
{
bytes.CopyTo(GetSpan(bytes.Length));
}
}
private void PutString(string value)
{
if (value.Length == 0)
{
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(Span<byte>.Empty));
}
_buffer.GetSpan(1)[0] = BinaryTupleCommon.VarlenEmptyByte;
_buffer.Advance(1);
return;
}
var maxByteCount = ProtoCommon.StringEncoding.GetMaxByteCount(value.Length);
var span = _buffer.GetSpan(maxByteCount);
var actualBytes = ProtoCommon.StringEncoding.GetBytes(value, span);
span = span[..actualBytes];
if (GetHashOrder() is { } hashOrder2)
{
PutHash(hashOrder2, HashUtils.Hash32(span));
}
_buffer.Advance(actualBytes);
// UTF-8 encoded strings should not start with 0x80 (character codes larger than 127 have a multi-byte encoding).
// We trust this but verify.
if (span[0] == BinaryTupleCommon.VarlenEmptyByte)
{
throw new InvalidOperationException(
$"Failed to encode a string element: resulting payload starts with invalid {BinaryTupleCommon.VarlenEmptyByte} byte");
}
}
private (long Seconds, int Nanos) PutTimestamp(Instant value, int precision)
{
var (seconds, nanos) = value.ToSecondsAndNanos(precision);
PutLong(seconds);
if (nanos != 0)
{
PutInt(nanos);
}
return (seconds, nanos);
}
private void PutDuration(Duration value)
{
// Logic taken from
// https://github.com/nodatime/nodatime.serialization/blob/main/src/NodaTime.Serialization.Protobuf/NodaExtensions.cs#L42
// (Apache License).
long days = value.Days;
long nanoOfDay = value.NanosecondOfDay;
long secondOfDay = nanoOfDay / NodaConstants.NanosecondsPerSecond;
long seconds = days * NodaConstants.SecondsPerDay + secondOfDay;
int nanos = value.SubsecondNanoseconds;
PutLong(seconds);
if (nanos != 0)
{
PutInt(nanos);
}
}
private void PutPeriod(Period value)
{
if (value.HasTimeComponent)
{
throw new NotSupportedException("Period with time component is not supported.");
}
if (value.Weeks != 0)
{
throw new NotSupportedException("Period with weeks component is not supported.");
}
int years = value.Years;
int months = value.Months;
int days = value.Days;
if (years is >= sbyte.MinValue and <= sbyte.MaxValue &&
months is >= sbyte.MinValue and <= sbyte.MaxValue &&
days is >= sbyte.MinValue and <= sbyte.MaxValue)
{
PutByte((sbyte) years);
PutByte((sbyte) months);
PutByte((sbyte) days);
}
else if (years is >= short.MinValue and <= short.MaxValue &&
months is >= short.MinValue and <= short.MaxValue &&
days is >= short.MinValue and <= short.MaxValue)
{
PutShort((short) years);
PutShort((short) months);
PutShort((short) days);
}
else
{
PutInt(years);
PutInt(months);
PutInt(days);
}
}
private void PutTime(LocalTime value, int precision)
{
long hour = value.Hour;
long minute = value.Minute;
long second = value.Second;
long nanos = TemporalTypes.NormalizeNanos(value.NanosecondOfSecond, precision);
if ((nanos % 1000) != 0)
{
long time = (hour << 42) | (minute << 36) | (second << 30) | nanos;
PutInt((int)time);
PutShort((short)(time >> 32));
}
else if ((nanos % 1000000) != 0)
{
long time = (hour << 32) | (minute << 26) | (second << 20) | (nanos / 1000);
PutInt((int)time);
PutByte((sbyte)(time >> 32));
}
else
{
long time = (hour << 22) | (minute << 16) | (second << 10) | (nanos / 1000000);
PutInt((int)time);
}
}
private void PutDate(LocalDate value)
{
int year = value.Year;
int month = value.Month;
int day = value.Day;
int date = (year << 9) | (month << 5) | day;
// Write int32 as 3 bytes, preserving sign.
Span<byte> buf = stackalloc byte[4];
BinaryPrimitives.WriteInt32LittleEndian(buf, date << 8);
buf[1..].CopyTo(GetSpan(3));
}
private void AppendTypeAndScale(ColumnType type, int scale = 0)
{
AppendInt((int)type);
AppendInt(scale);
}
private void AppendTypeAndSize(ColumnType type, int size)
{
AppendInt((int)type);
AppendInt(size);
}
private void PutNumber(BigInteger value)
{
var size = value.GetByteCount();
var destination = GetSpan(size);
var success = value.TryWriteBytes(destination, out int written, isBigEndian: true);
if (GetHashOrder() is { } hashOrder)
{
PutHash(hashOrder, HashUtils.Hash32(destination[..written]));
}
Debug.Assert(success, "success");
Debug.Assert(written == size, "written == size");
OnWrite();
}
private void OnWrite()
{
Debug.Assert(_elementIndex < _numElements, "_elementIndex < _numElements");
int offset = _buffer.Position - _valueBase;
switch (_entrySize)
{
case 1:
_buffer.WriteByte((byte)offset, _entryBase + _elementIndex);
break;
case 2:
_buffer.WriteShort((short)offset, _entryBase + _elementIndex * 2);
break;
case 4:
_buffer.WriteInt(offset, _entryBase + _elementIndex * 4);
break;
default:
throw new InvalidOperationException("Tuple entry size is invalid.");
}
_elementIndex++;
}
private Span<byte> GetSpan(int size)
{
var span = _buffer.GetSpan(size)[..size];
_buffer.Advance(size);
return span;
}
private int? GetHashOrder() => _hashedColumnsPredicate?.HashedColumnOrder(_elementIndex) switch
{
null or < 0 => null,
{ } order => order
};
private void PutHash(int index, int hash)
{
Debug.Assert(_hashedColumnsPredicate != null, "_hashedColumnsPredicate != null");
Debug.Assert(index >= 0, "index >= 0");
Debug.Assert(index < _hashedColumnsPredicate.HashedColumnCount, "index < _hashedColumnsPredicate.HashedColumnCount");
GetHashSpan()[index] = hash;
}
private Span<int> GetHashSpan() => MemoryMarshal.Cast<byte, int>(_buffer.GetWrittenMemory().Span);
}
}