code/KustoCopyConsole/Kusto/DbQueryClient.cs (152 lines of code) (raw):

using Kusto.Data.Common; using KustoCopyConsole.Concurrency; using KustoCopyConsole.Kusto.Data; using System.Collections.Immutable; using System.Data; namespace KustoCopyConsole.Kusto { internal class DbQueryClient { private static readonly ClientRequestProperties EMPTY_PROPERTIES = new ClientRequestProperties(); private readonly ICslQueryProvider _provider; private readonly PriorityExecutionQueue<KustoPriority> _queue; private readonly string _databaseName; public DbQueryClient( ICslQueryProvider provider, PriorityExecutionQueue<KustoPriority> queue, string databaseName) { _provider = provider; _queue = queue; _databaseName = databaseName; } public async Task<string> GetCurrentCursorAsync( KustoPriority priority, CancellationToken ct) { return await _queue.RequestRunAsync( priority, async () => { var query = "print cursor_current()"; var reader = await _provider.ExecuteQueryAsync( _databaseName, query, EMPTY_PROPERTIES, ct); var cursor = reader .ToEnumerable(r => (string)r[0]) .FirstOrDefault(); return cursor!; }); } public async Task<bool> HasNullIngestionTime( KustoPriority priority, string tableName, string? kqlQuery, CancellationToken ct) { return await _queue.RequestRunAsync( priority, async () => { var query = @$" let BaseData = ['{tableName}'] {kqlQuery} ; BaseData | where isnull(ingestion_time()) | take 1 | count "; var properties = new ClientRequestProperties(); var reader = await _provider.ExecuteQueryAsync( _databaseName, query, properties, ct); var result = reader .ToEnumerable(r => (long) r[0]) .First(); var hasNull = result != 0; return hasNull; }); } public async Task<IImmutableList<RecordDistribution>> GetRecordDistributionAsync( KustoPriority priority, string tableName, string? kqlQuery, string cursorStart, string cursorEnd, DateTime? ingestionTimeStart, int maxStatCount, CancellationToken ct) { return await _queue.RequestRunAsync( priority, async () => { const string CURSOR_START_PARAM = "CursorStart"; const string CURSOR_END_PARAM = "CursorEnd"; const string INGESTION_TIME_START_PARAM = "IngestionTimeStart"; var cursorStartFilter = string.IsNullOrWhiteSpace(cursorStart) ? string.Empty : $"| where cursor_after({CURSOR_START_PARAM})"; var ingestionTimeStartFilter = ingestionTimeStart==null ? string.Empty : $"| where ingestion_time()>todatetime({INGESTION_TIME_START_PARAM})"; var query = @$" declare query_parameters( {CURSOR_START_PARAM}:string, {CURSOR_END_PARAM}:string, {INGESTION_TIME_START_PARAM}:datetime=datetime(null)); let ['{tableName}'] = ['{tableName}'] {cursorStartFilter} | where cursor_before_or_at({CURSOR_END_PARAM}) {ingestionTimeStartFilter}; let BaseData = ['{tableName}'] {kqlQuery} ; // Cut the ingestion time away let MaxIngestionTime = toscalar( BaseData | summarize by IngestionTime=ingestion_time(), ExtentId=tostring(extent_id()) | top {maxStatCount} by IngestionTime asc | summarize max(IngestionTime)); // Reuse the cut-away value in case the maxStatCount cut in the middle of a constant ingestion time sequence BaseData | summarize RowCount=count() by IngestionTime=ingestion_time(), ExtentId=extent_id() | where IngestionTime <= MaxIngestionTime | order by IngestionTime asc | extend Rank=row_rank_dense(ExtentId) | summarize IngestionTimeStart=min(IngestionTime), IngestionTimeEnd=max(IngestionTime), ExtentId=take_any(ExtentId), RowCount=sum(RowCount) by Rank | project-away Rank | extend ExtentId=iif(ExtentId==guid('00000000-0000-0000-0000-000000000000'), '', tostring(ExtentId)) "; var properties = new ClientRequestProperties(); properties.SetParameter(CURSOR_START_PARAM, cursorStart); properties.SetParameter(CURSOR_END_PARAM, cursorEnd); if (ingestionTimeStart != null) { properties.SetParameter(INGESTION_TIME_START_PARAM, ingestionTimeStart.Value); } var reader = await _provider.ExecuteQueryAsync( _databaseName, query, properties, ct); var result = reader .ToEnumerable(r => new RecordDistribution( (DateTime)(r["IngestionTimeStart"]), (DateTime)(r["IngestionTimeEnd"]), (string)(r["ExtentId"]), (long)r["RowCount"])) .ToImmutableArray(); return result; }); } } }