code/KustoCopyConsole/Kusto/IngestClient.cs (86 lines of code) (raw):
using Kusto.Data.Common;
using Kusto.Ingest;
using KustoCopyConsole.Concurrency;
using KustoCopyConsole.Kusto.Data;
using System.Collections.Immutable;
namespace KustoCopyConsole.Kusto
{
internal class IngestClient
{
private static readonly IImmutableList<Status> FAILED_STATUS = [
Status.Skipped,
Status.Failed,
Status.PartiallySucceeded];
private readonly IKustoQueuedIngestClient _ingestProvider;
private readonly PriorityExecutionQueue<KustoPriority> _queue;
private readonly string _database;
private readonly string _table;
public IngestClient(
IKustoQueuedIngestClient ingestProvider,
PriorityExecutionQueue<KustoPriority> queue,
string database,
string table)
{
_ingestProvider = ingestProvider;
_queue = queue;
_database = database;
_table = table;
}
public async Task<string> QueueBlobAsync(
KustoPriority priority,
Uri blobPath,
string extentTag,
DateTime? creationTime,
CancellationToken ct)
{
var tagList = new[] { extentTag };
var properties = new KustoQueuedIngestionProperties(_database, _table)
{
Format = DataSourceFormat.parquet,
AdditionalTags = tagList,
ReportLevel = IngestionReportLevel.FailuresAndSuccesses,
ReportMethod = IngestionReportMethod.Table
};
if (creationTime != null)
{
properties.AdditionalProperties.Add(
"creationTime",
creationTime.Value.ToString("o"));
}
return await _queue.RequestRunAsync(
priority,
async () =>
{
var ingestionResult = await _ingestProvider.IngestFromStorageAsync(
blobPath.ToString(),
properties);
var serializedResult = IngestionResultSerializer.Serialize(ingestionResult);
return serializedResult;
});
}
public async Task<IngestionFailureDetail?> FetchIngestionFailureAsync(
string serializedQueuedResult)
{
var ingestionResult = IngestionResultSerializer.Deserialize(serializedQueuedResult);
var status = ingestionResult.GetIngestionStatusCollection();
await Task.CompletedTask;
if (status.Count() != 1)
{
throw new InvalidOperationException(
$"Status count was expected to be 1 but is {status.Count()}");
}
var firstStatus = status.First();
if (FAILED_STATUS.Contains(firstStatus.Status)
&& firstStatus.FailureStatus != FailureStatus.Transient)
{
return new IngestionFailureDetail(
firstStatus.Status.ToString(),
firstStatus.Details);
}
else
{
return null;
}
}
}
}