code/KustoCopyConsole/Kusto/DbCommandClient.cs (379 lines of code) (raw):
using Kusto.Data.Common;
using KustoCopyConsole.Concurrency;
using KustoCopyConsole.Kusto.Data;
using System.Collections.Immutable;
using System.Data;
namespace KustoCopyConsole.Kusto
{
internal class DbCommandClient
{
private readonly Random _random = new();
private readonly ICslAdminProvider _provider;
private readonly PriorityExecutionQueue<KustoPriority> _commandQueue;
public DbCommandClient(
ICslAdminProvider provider,
PriorityExecutionQueue<KustoPriority> commandQueue,
string databaseName)
{
_provider = provider;
_commandQueue = commandQueue;
DatabaseName = databaseName;
}
public string DatabaseName { get; }
public async Task<int> ShowExportCapacityAsync(
KustoPriority priority,
CancellationToken ct)
{
return await _commandQueue.RequestRunAsync(
priority,
async () =>
{
var commandText = @$"
.show capacity
| where Resource == 'DataExport'
| project Total";
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText);
var result = reader
.ToEnumerable(r => (long)r[0])
.First();
return (int)result;
});
}
public async Task<IImmutableList<ExportOperationStatus>> ShowOperationsAsync(
KustoPriority priority,
IEnumerable<string> operationIds,
CancellationToken ct)
{
if (operationIds.Any())
{
return await _commandQueue.RequestRunAsync(
priority,
async () =>
{
var operationIdsText = string.Join(", ", operationIds);
var commandText = @$".show operations({operationIdsText})
| project OperationId, Duration, State, Status, ShouldRetry";
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText);
var result = reader
.ToEnumerable(r => new ExportOperationStatus(
((Guid)r["OperationId"]).ToString(),
(TimeSpan)r["Duration"],
(string)r["State"],
(string)r["Status"],
Convert.ToBoolean((SByte)r["ShouldRetry"])
))
.ToImmutableArray();
return result;
});
}
else
{
return ImmutableArray<ExportOperationStatus>.Empty;
}
}
public async Task<string> ExportBlockAsync(
KustoPriority priority,
IEnumerable<Uri> storageRootUris,
string tableName,
string? kqlQuery,
string cursorStart,
string cursorEnd,
DateTime ingestionTimeStart,
DateTime ingestionTimeEnd,
CancellationToken ct)
{
return await _commandQueue.RequestRunAsync(
priority,
async () =>
{
const string CURSOR_START_PARAM = "CursorStart";
const string CURSOR_END_PARAM = "CursorEnd";
const string INGESTION_TIME_START_PARAM = "IngestionTimeStart";
const string INGESTION_TIME_END_PARAM = "IngestionTimeEnd";
var rootListText = string.Join(", ", storageRootUris.Select(u => $"h'{u}'"));
var cursorStartFilter = string.IsNullOrWhiteSpace(cursorStart)
? string.Empty
: $"| where cursor_after({CURSOR_START_PARAM})";
var commandText = @$"
.export async compressed to parquet (
{rootListText}
)
with (
namePrefix=""export"",
persistDetails=true,
parquetDatetimePrecision=""microsecond"",
distribution=""per_node""
) <|
declare query_parameters(
{CURSOR_START_PARAM}:string,
{CURSOR_END_PARAM}:string,
{INGESTION_TIME_START_PARAM}:datetime,
{INGESTION_TIME_END_PARAM}:datetime);
let ['{tableName}'] = ['{tableName}']
{cursorStartFilter}
| where cursor_before_or_at({CURSOR_END_PARAM})
| where ingestion_time() between ({INGESTION_TIME_START_PARAM}.. {INGESTION_TIME_END_PARAM});
['{tableName}']
{kqlQuery}
";
var properties = new ClientRequestProperties();
properties.SetParameter(CURSOR_START_PARAM, cursorStart);
properties.SetParameter(CURSOR_END_PARAM, cursorEnd);
properties.SetParameter(INGESTION_TIME_START_PARAM, ingestionTimeStart);
properties.SetParameter(INGESTION_TIME_END_PARAM, ingestionTimeEnd);
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText,
properties);
var operationId = reader
.ToEnumerable(r => (Guid)r[0])
.First();
return operationId.ToString();
});
}
public async Task<IImmutableList<ExtentDate>> GetExtentDatesAsync(
KustoPriority priority,
string tableName,
IEnumerable<string> extentIds,
CancellationToken ct)
{
return await _commandQueue.RequestRunAsync(
priority,
async () =>
{
var extentList = string.Join(", ", extentIds);
var commandText = @$"
.show table ['{tableName}'] extents ({extentList})
| project ExtentId=tostring(ExtentId), MinCreatedOn";
var properties = new ClientRequestProperties();
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText,
properties);
var result = reader
.ToEnumerable(r => new ExtentDate(
(string)(r[0]),
(DateTime)(r[1])))
.ToImmutableArray();
return result;
});
}
public async Task<IImmutableList<ExportDetail>> ShowExportDetailsAsync(
KustoPriority priority,
string operationId,
CancellationToken ct)
{
return await _commandQueue.RequestRunAsync(
priority,
async () =>
{
var commandText = @$"
.show operation {operationId} details
";
var properties = new ClientRequestProperties();
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText,
properties);
var result = reader
.ToEnumerable(r => new ExportDetail(
new Uri((string)(r[0])),
(long)(r[1]),
(long)(r[2])))
.ToImmutableArray();
return result;
});
}
#region Table Management
public async Task DropTableIfExistsAsync(
KustoPriority priority,
string tableName,
CancellationToken ct)
{
await _commandQueue.RequestRunAsync(
priority,
async () =>
{
var commandText = @$"
.drop table ['{tableName}'] ifexists
";
var properties = new ClientRequestProperties();
await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText,
properties);
});
}
public async Task CreateTempTableAsync(
KustoPriority priority,
string tableName,
string tempTableName,
CancellationToken ct)
{
await _commandQueue.RequestRunAsync(
priority,
async () =>
{
var commandText = @$"
.execute database script with (ContinueOnErrors=true) <|
.create table ['{tempTableName}'] based-on ['{tableName}'] with (folder=""kc"")
.delete table ['{tempTableName}'] policy extent_tags_retention
.alter table ['{tempTableName}'] policy ingestionbatching
```
{{
""MaximumBatchingTimeSpan"" : ""00:00:15"",
""MaximumNumberOfItems"" : 2000,
""MaximumRawDataSizeMB"": 2048
}}
```
.alter-merge table ['{tempTableName}'] policy merge
```
{{
""AllowRebuild"": false,
""AllowMerge"": false
}}
```
.delete table ['{tempTableName}'] policy partitioning
.alter table ['{tempTableName}'] policy restricted_view_access true
";
var properties = new ClientRequestProperties();
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText,
properties);
var result = reader
.ToEnumerable(r => new
{
OperationId = (Guid)(r[0]),
CommandType = (string)(r[1]),
CommandText = (string)(r[2]),
Result = (string)(r[3]),
Reason = (string)(r[4])
})
.Where(o => o.Result != "Completed")
.FirstOrDefault();
if (result != null)
{
throw new CopyException(
$"Command failed in operation '{result.OperationId}': "
+ $"{result.CommandType} / '{result.CommandText}' / {result.Result} "
+ $"'{result.Reason}'",
false);
}
});
}
#endregion
public async Task<IImmutableList<ExtentRowCount>> GetExtentRowCountsAsync(
KustoPriority priority,
string tempTableName,
CancellationToken ct)
{
return await _commandQueue.RequestRunAsync(
priority,
async () =>
{
var commandText = @$"
.show table ['{tempTableName}'] extents
| project ExtentId, RowCount, Tags
";
var properties = new ClientRequestProperties();
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText,
properties);
var result = reader
.ToEnumerable(r => new ExtentRowCount(
((Guid)r["ExtentId"]).ToString(),
(string)r["Tags"],
(long)r["RowCount"]))
.ToImmutableArray();
return result;
});
}
public async Task<int> MoveExtentsAsync(
KustoPriority priority,
string tempTableName,
string tableName,
IEnumerable<string> extentIds,
CancellationToken ct)
{
return await _commandQueue.RequestRunAsync(
priority,
async () =>
{
var extentIdTextList = string.Join(", ", extentIds);
var commandText = @$"
.move extents from table ['{tempTableName}'] to table ['{tableName}']
with (setNewIngestionTime=true)
({extentIdTextList})
";
var properties = new ClientRequestProperties();
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText,
properties);
var results = reader
.ToEnumerable(r => new
{
OriginalExtentId = (string)(r[0]),
ResultExtentId = (string)(r[1]),
Details = r[2].ToString()
})
.ToImmutableArray();
var singleDetail = results
.Where(r => !string.IsNullOrWhiteSpace(r.Details))
.Select(r => r.Details)
.FirstOrDefault();
if (singleDetail != null)
{
throw new CopyException($"Move extent failure: '{singleDetail}'", true);
}
return results.Count();
});
}
public async Task<int> CleanExtentTagsAsync(
KustoPriority priority,
string tableName,
IEnumerable<string> tags,
CancellationToken ct)
{
return await _commandQueue.RequestRunAsync(
priority,
async () =>
{
var tagListText = string.Join(", ", tags.Select(t => $"'{t}'"));
var commandText = @$"
.drop table ['{tableName}'] extent tags
({tagListText})
";
var properties = new ClientRequestProperties();
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText,
properties);
var results = reader
.ToEnumerable(r => new
{
OriginalExtentId = (string)(r[0]),
ResultExtentId = (string)(r[1]),
ResultExtentTags = (string)(r[2]),
Details = r[3].ToString()
})
.ToImmutableArray();
var singleDetail = results
.Where(r => !string.IsNullOrWhiteSpace(r.Details))
.Select(r => r.Details)
.FirstOrDefault();
if (singleDetail != null)
{
throw new CopyException($"Clean extent failure: '{singleDetail}'", true);
}
return results.Count();
});
}
}
}