modules/platforms/dotnet/Apache.Ignite/Sql/IgniteDbDataReader.cs (366 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Sql;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using Internal.Buffers;
using Internal.Common;
using Internal.Proto;
using Internal.Proto.BinaryTuple;
using Internal.Sql;
using NodaTime;
/// <summary>
/// Reads a forward-only stream of rows from an Ignite result set.
/// </summary>
[SuppressMessage("Design", "CA1010:Generic interface should also be implemented", Justification = "Generic IEnumerable is not applicable.")]
[SuppressMessage("Usage", "CA2215:Dispose methods should call base class dispose", Justification = "Base class dispose is empty.")]
public sealed class IgniteDbDataReader : DbDataReader, IDbColumnSchemaGenerator
{
private static readonly Task<bool> TrueTask = Task.FromResult(true);
private readonly ResultSet<object> _resultSet;
private readonly IAsyncEnumerator<PooledBuffer> _pageEnumerator;
private int _pageRowCount = -1;
private int _pageRowIndex = -1;
private int _pageRowOffset = -1;
private int _pageRowSize = -1;
private ReadOnlyCollection<DbColumn>? _schema;
/// <summary>
/// Initializes a new instance of the <see cref="IgniteDbDataReader"/> class.
/// </summary>
/// <param name="resultSet">Result set.</param>
internal IgniteDbDataReader(ResultSet<object> resultSet)
{
Debug.Assert(resultSet.HasRowSet, "_resultSet.HasRowSet");
_resultSet = resultSet;
_pageEnumerator = _resultSet.EnumeratePagesInternal().GetAsyncEnumerator();
}
/// <inheritdoc/>
public override int FieldCount => Metadata.Columns.Count;
/// <inheritdoc/>
public override int RecordsAffected => checked((int)_resultSet.AffectedRows);
/// <inheritdoc/>
public override bool HasRows => _resultSet.HasRows;
/// <inheritdoc/>
public override bool IsClosed => _resultSet.IsDisposed;
/// <summary>
/// Gets a value indicating the depth of nesting for the current row. Always zero in Ignite.
/// </summary>
/// <returns>The level of nesting.</returns>
public override int Depth => 0;
/// <summary>
/// Gets Ignite-specific result set metadata.
/// </summary>
public IResultSetMetadata Metadata => _resultSet.Metadata!;
/// <inheritdoc/>
public override object this[int ordinal] => GetValue(ordinal);
/// <inheritdoc/>
[SuppressMessage(
"Design",
"CA1065:Do not raise exceptions in unexpected locations",
Justification = "Indexer must raise an exception on invalid column name.")]
public override object this[string name] =>
Metadata.IndexOf(name) is var index and >= 0
? GetValue(index)
: throw new InvalidOperationException($"Column '{name}' is not present in this reader.");
/// <inheritdoc />
public override bool GetBoolean(int ordinal) => GetReader(ordinal, typeof(bool)).GetByteAsBool(ordinal);
/// <inheritdoc/>
public override byte GetByte(int ordinal) => Metadata.Columns[ordinal] switch
{
var c when c.Type.IsAnyInt() => unchecked((byte)GetReader().GetByte(ordinal)),
var c => throw GetInvalidColumnTypeException(typeof(byte), c)
};
/// <inheritdoc/>
public override long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length)
{
if (dataOffset is < 0 or > int.MaxValue)
{
throw new ArgumentOutOfRangeException(
nameof(dataOffset),
dataOffset,
$"{nameof(dataOffset)} must be between {0} and {int.MaxValue}");
}
if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length + 1))
{
throw new ArgumentOutOfRangeException($"{nameof(bufferOffset)} must be between {0} and {(buffer.Length)}");
}
if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset))
{
throw new ArgumentOutOfRangeException($"{nameof(length)} must be between {0} and {buffer.Length - bufferOffset}");
}
var span = GetReader(ordinal, typeof(byte[])).GetBytesSpan(ordinal);
if (buffer == null)
{
return span.Length;
}
var slice = span.Slice(checked((int)dataOffset), length);
slice.CopyTo(buffer);
return slice.Length;
}
/// <inheritdoc/>
public override char GetChar(int ordinal) => throw new NotSupportedException("char data type is not supported");
/// <inheritdoc/>
public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length)
{
if (dataOffset is < 0 or > int.MaxValue)
{
throw new ArgumentOutOfRangeException(
nameof(dataOffset),
dataOffset,
$"{nameof(dataOffset)} must be between {0} and {int.MaxValue}");
}
if (buffer != null && (bufferOffset < 0 || bufferOffset >= buffer.Length + 1))
{
throw new ArgumentOutOfRangeException($"{nameof(bufferOffset)} must be between {0} and {(buffer.Length)}");
}
if (buffer != null && (length < 0 || length > buffer.Length - bufferOffset))
{
throw new ArgumentOutOfRangeException($"{nameof(length)} must be between {0} and {buffer.Length - bufferOffset}");
}
var span = GetReader(ordinal, typeof(string)).GetBytesSpan(ordinal);
if (buffer == null)
{
return ProtoCommon.StringEncoding.GetCharCount(span);
}
return ProtoCommon.StringEncoding.GetChars(
span.Slice(checked((int)dataOffset)),
buffer.AsSpan().Slice(bufferOffset, length));
}
/// <inheritdoc/>
public override string GetDataTypeName(int ordinal) => Metadata.Columns[ordinal].Type.ToSqlTypeName();
/// <inheritdoc/>
public override DateTime GetDateTime(int ordinal)
{
var column = Metadata.Columns[ordinal];
// ReSharper disable once SwitchExpressionHandlesSomeKnownEnumValuesWithExceptionInDefault
return column.Type switch
{
ColumnType.Date => GetReader().GetDate(ordinal).ToDateTimeUnspecified(),
ColumnType.Datetime => GetReader().GetDateTime(ordinal).ToDateTimeUnspecified(),
ColumnType.Timestamp => GetReader().GetTimestamp(ordinal).ToDateTimeUtc(),
_ => throw GetInvalidColumnTypeException(typeof(DateTime), column)
};
}
/// <inheritdoc/>
public override decimal GetDecimal(int ordinal)
{
var column = Metadata.Columns[ordinal];
ValidateColumnType(typeof(decimal), column);
return GetReader().GetDecimal(ordinal, column.Scale);
}
/// <inheritdoc/>
public override double GetDouble(int ordinal) => Metadata.Columns[ordinal] switch
{
var c when c.Type.IsAnyFloat() => GetReader().GetDouble(ordinal),
var c => throw GetInvalidColumnTypeException(typeof(double), c)
};
/// <inheritdoc/>
public override float GetFloat(int ordinal) => Metadata.Columns[ordinal] switch
{
var c when c.Type.IsAnyFloat() => GetReader().GetFloat(ordinal),
var c => throw GetInvalidColumnTypeException(typeof(float), c)
};
/// <inheritdoc/>
public override Guid GetGuid(int ordinal) => GetReader(ordinal, typeof(Guid)).GetGuid(ordinal);
/// <inheritdoc/>
public override short GetInt16(int ordinal) => Metadata.Columns[ordinal] switch
{
var c when c.Type.IsAnyInt() => GetReader().GetShort(ordinal),
var c => throw GetInvalidColumnTypeException(typeof(short), c)
};
/// <inheritdoc/>
public override int GetInt32(int ordinal) => Metadata.Columns[ordinal] switch
{
var c when c.Type.IsAnyInt() => GetReader().GetInt(ordinal),
var c => throw GetInvalidColumnTypeException(typeof(int), c)
};
/// <inheritdoc/>
public override long GetInt64(int ordinal) => Metadata.Columns[ordinal] switch
{
var c when c.Type.IsAnyInt() => GetReader().GetLong(ordinal),
var c => throw GetInvalidColumnTypeException(typeof(long), c)
};
/// <inheritdoc/>
public override string GetName(int ordinal) => Metadata.Columns[ordinal].Name;
/// <inheritdoc/>
public override int GetOrdinal(string name) => Metadata.IndexOf(name);
/// <inheritdoc/>
public override string GetString(int ordinal) => GetReader(ordinal, typeof(string)).GetString(ordinal);
/// <inheritdoc/>
public override object GetValue(int ordinal)
{
var reader = GetReader();
return Sql.ReadColumnValue(ref reader, Metadata.Columns[ordinal], ordinal)!;
}
/// <inheritdoc/>
public override int GetValues(object[] values)
{
IgniteArgumentCheck.NotNull(values, nameof(values));
var cols = Metadata.Columns;
var count = Math.Min(values.Length, cols.Count);
var reader = GetReader();
for (int i = 0; i < count; i++)
{
values[i] = Sql.ReadColumnValue(ref reader, cols[i], i)!;
}
return count;
}
/// <inheritdoc/>
public override bool IsDBNull(int ordinal) => GetReader().IsNull(ordinal);
/// <inheritdoc/>
public override bool NextResult() => false;
/// <inheritdoc/>
public override bool Read() => ReadNextRowInCurrentPage() || FetchNextPage().GetAwaiter().GetResult();
/// <inheritdoc/>
public override Task<bool> ReadAsync(CancellationToken cancellationToken) => ReadNextRowInCurrentPage() ? TrueTask : FetchNextPage();
/// <inheritdoc/>
public override IEnumerator GetEnumerator() => new DbEnumerator(this);
/// <inheritdoc/>
public ReadOnlyCollection<DbColumn> GetColumnSchema()
{
if (_schema == null)
{
var schema = new List<DbColumn>(FieldCount);
for (var i = 0; i < Metadata.Columns.Count; i++)
{
schema.Add(new IgniteDbColumn(Metadata.Columns[i], i));
}
_schema = schema.AsReadOnly();
}
return _schema;
}
/// <inheritdoc/>
public override DataTable GetSchemaTable()
{
var table = new DataTable("SchemaTable");
table.Columns.Add(SchemaTableColumn.ColumnName, typeof(string));
table.Columns.Add(SchemaTableColumn.ColumnOrdinal, typeof(int));
table.Columns.Add(SchemaTableColumn.ColumnSize, typeof(int));
table.Columns.Add(SchemaTableColumn.NumericPrecision, typeof(int));
table.Columns.Add(SchemaTableColumn.NumericScale, typeof(int));
table.Columns.Add(SchemaTableColumn.IsUnique, typeof(bool));
table.Columns.Add(SchemaTableColumn.IsKey, typeof(bool));
table.Columns.Add(SchemaTableColumn.BaseColumnName, typeof(string));
table.Columns.Add(SchemaTableColumn.BaseSchemaName, typeof(string));
table.Columns.Add(SchemaTableColumn.BaseTableName, typeof(string));
table.Columns.Add(SchemaTableColumn.DataType, typeof(Type));
table.Columns.Add(SchemaTableColumn.AllowDBNull, typeof(bool));
table.Columns.Add(SchemaTableColumn.ProviderType, typeof(int));
table.Columns.Add(SchemaTableColumn.IsAliased, typeof(bool));
table.Columns.Add(SchemaTableColumn.IsExpression, typeof(bool));
table.Columns.Add(SchemaTableColumn.IsLong, typeof(bool));
foreach (var column in GetColumnSchema())
{
var row = table.NewRow();
row[SchemaTableColumn.ColumnName] = column.ColumnName;
row[SchemaTableColumn.ColumnOrdinal] = column.ColumnOrdinal ?? -1;
row[SchemaTableColumn.ColumnSize] = column.ColumnSize ?? -1;
row[SchemaTableColumn.NumericPrecision] = column.NumericPrecision ?? 0;
row[SchemaTableColumn.NumericScale] = column.NumericScale ?? 0;
row[SchemaTableColumn.IsUnique] = column.IsUnique == true;
row[SchemaTableColumn.IsKey] = column.IsKey == true;
row[SchemaTableColumn.BaseColumnName] = column.BaseColumnName;
row[SchemaTableColumn.BaseSchemaName] = column.BaseSchemaName;
row[SchemaTableColumn.BaseTableName] = column.BaseTableName;
row[SchemaTableColumn.DataType] = column.DataType;
row[SchemaTableColumn.AllowDBNull] = column.AllowDBNull == null ? DBNull.Value : column.AllowDBNull.Value;
row[SchemaTableColumn.ProviderType] = (int)((IgniteDbColumn)column).ColumnMetadata.Type;
row[SchemaTableColumn.IsAliased] = column.IsAliased == true;
row[SchemaTableColumn.IsExpression] = column.IsExpression == true;
row[SchemaTableColumn.IsLong] = column.IsLong == true;
table.Rows.Add(row);
}
return table;
}
/// <inheritdoc/>
public override async ValueTask DisposeAsync()
{
await _pageEnumerator.DisposeAsync().ConfigureAwait(false);
await _resultSet.DisposeAsync().ConfigureAwait(false);
}
/// <inheritdoc/>
public override void Close() => Dispose();
/// <inheritdoc/>
public override Task CloseAsync() => DisposeAsync().AsTask();
/// <inheritdoc/>
public override T GetFieldValue<T>(int ordinal)
{
if (typeof(T) == typeof(string))
{
return (T)(object)GetString(ordinal);
}
if (typeof(T) == typeof(int))
{
return (T)(object)GetInt32(ordinal);
}
if (typeof(T) == typeof(long))
{
return (T)(object)GetInt64(ordinal);
}
if (typeof(T) == typeof(short))
{
return (T)(object)GetInt16(ordinal);
}
if (typeof(T) == typeof(float))
{
return (T)(object)GetFloat(ordinal);
}
if (typeof(T) == typeof(double))
{
return (T)(object)GetDouble(ordinal);
}
if (typeof(T) == typeof(decimal))
{
return (T)(object)GetDecimal(ordinal);
}
if (typeof(T) == typeof(byte))
{
return (T)(object)GetByte(ordinal);
}
if (typeof(T) == typeof(byte[]))
{
return (T)(object)GetReader(ordinal, typeof(byte[])).GetBytes(ordinal);
}
if (typeof(T) == typeof(LocalTime))
{
return (T)(object)GetReader(ordinal, typeof(LocalTime)).GetTime(ordinal);
}
if (typeof(T) == typeof(LocalDate))
{
return (T)(object)GetReader(ordinal, typeof(LocalDate)).GetDate(ordinal);
}
if (typeof(T) == typeof(LocalDateTime))
{
return (T)(object)GetReader(ordinal, typeof(LocalDateTime)).GetDateTime(ordinal);
}
if (typeof(T) == typeof(DateTime))
{
return (T)(object)GetDateTime(ordinal);
}
if (typeof(T) == typeof(Instant))
{
return (T)(object)GetReader(ordinal, typeof(Instant)).GetTimestamp(ordinal);
}
throw GetInvalidColumnTypeException(typeof(T), Metadata.Columns[ordinal]);
}
/// <inheritdoc/>
public override Type GetFieldType(int ordinal) => Metadata.Columns[ordinal].Type.ToClrType();
/// <inheritdoc/>
public override string ToString() =>
new IgniteToStringBuilder(GetType())
.Append(FieldCount)
.Append(RecordsAffected)
.Append(HasRows)
.Append(IsClosed)
.Append(Metadata)
.Build();
/// <inheritdoc/>
protected override void Dispose(bool disposing) => DisposeAsync().AsTask().GetAwaiter().GetResult();
private static void ValidateColumnType(Type type, IColumnMetadata column)
{
if (column.Type != type.ToColumnType())
{
throw GetInvalidColumnTypeException(type, column);
}
}
private static InvalidCastException GetInvalidColumnTypeException(Type type, IColumnMetadata column) =>
new($"Column {column.Name} of type {column.Type.ToSqlTypeName()} ({column.Type.ToClrType()}) can not be cast to {type}.");
private BinaryTupleReader GetReader(int ordinal, Type type)
{
var column = Metadata.Columns[ordinal];
ValidateColumnType(type, column);
return GetReader();
}
private BinaryTupleReader GetReader()
{
if (_pageRowCount < 0)
{
throw new InvalidOperationException(
$"No data exists for the row/column. Reading has not started. Call {nameof(ReadAsync)} or {nameof(Read)}.");
}
var reader = _pageEnumerator.Current.GetReader(_pageRowOffset);
var tupleSpan = reader.ReadBinary();
return new BinaryTupleReader(tupleSpan, FieldCount);
}
private bool ReadNextRowInCurrentPage()
{
if (_pageRowCount <= 0 || _pageRowIndex >= _pageRowCount - 1)
{
return false;
}
_pageRowIndex++;
_pageRowOffset += _pageRowSize;
var reader = _pageEnumerator.Current.GetReader(_pageRowOffset);
_pageRowSize = reader.ReadBinaryHeader() + reader.Consumed;
return true;
}
private async Task<bool> FetchNextPage()
{
if (!await _pageEnumerator.MoveNextAsync().ConfigureAwait(false))
{
return false;
}
return ReadFirstRowInCurrentPage();
bool ReadFirstRowInCurrentPage()
{
var reader = _pageEnumerator.Current.GetReader();
_pageRowCount = reader.ReadArrayHeader();
_pageRowOffset = reader.Consumed;
_pageRowIndex = 0;
_pageRowSize = (_pageRowCount > 0 ? reader.ReadBinaryHeader() : 0) + reader.Consumed - _pageRowOffset;
return _pageRowCount > 0;
}
}
}