in code/KustoCopyConsole/Kusto/DbCommandClient.cs [86:149]
public async Task<string> ExportBlockAsync(
KustoPriority priority,
IEnumerable<Uri> storageRootUris,
string tableName,
string? kqlQuery,
string cursorStart,
string cursorEnd,
DateTime ingestionTimeStart,
DateTime ingestionTimeEnd,
CancellationToken ct)
{
return await _commandQueue.RequestRunAsync(
priority,
async () =>
{
const string CURSOR_START_PARAM = "CursorStart";
const string CURSOR_END_PARAM = "CursorEnd";
const string INGESTION_TIME_START_PARAM = "IngestionTimeStart";
const string INGESTION_TIME_END_PARAM = "IngestionTimeEnd";
var rootListText = string.Join(", ", storageRootUris.Select(u => $"h'{u}'"));
var cursorStartFilter = string.IsNullOrWhiteSpace(cursorStart)
? string.Empty
: $"| where cursor_after({CURSOR_START_PARAM})";
var commandText = @$"
.export async compressed to parquet (
{rootListText}
)
with (
namePrefix=""export"",
persistDetails=true,
parquetDatetimePrecision=""microsecond"",
distribution=""per_node""
) <|
declare query_parameters(
{CURSOR_START_PARAM}:string,
{CURSOR_END_PARAM}:string,
{INGESTION_TIME_START_PARAM}:datetime,
{INGESTION_TIME_END_PARAM}:datetime);
let ['{tableName}'] = ['{tableName}']
{cursorStartFilter}
| where cursor_before_or_at({CURSOR_END_PARAM})
| where ingestion_time() between ({INGESTION_TIME_START_PARAM}.. {INGESTION_TIME_END_PARAM});
['{tableName}']
{kqlQuery}
";
var properties = new ClientRequestProperties();
properties.SetParameter(CURSOR_START_PARAM, cursorStart);
properties.SetParameter(CURSOR_END_PARAM, cursorEnd);
properties.SetParameter(INGESTION_TIME_START_PARAM, ingestionTimeStart);
properties.SetParameter(INGESTION_TIME_END_PARAM, ingestionTimeEnd);
var reader = await _provider.ExecuteControlCommandAsync(
DatabaseName,
commandText,
properties);
var operationId = reader
.ToEnumerable(r => (Guid)r[0])
.First();
return operationId.ToString();
});
}