in code/KustoCopyConsole/Runner/QueueIngestRunner.cs [83:132]
private async Task QueueIngestBlockAsync(ActivityFlatHierarchy item, CancellationToken ct)
{
UrlRowItem MarkUrlAsQueued(UrlRowItem url, string serializedQueueResult)
{
var newUrl = url.ChangeState(UrlState.Queued);
newUrl.SerializedQueuedResult = serializedQueueResult;
return newUrl;
}
// It's possible, although unlikely, the temp table hasn't been created yet
// If so, we'll process this block later
if (item.TempTable != null)
{
var ingestClient = DbClientFactory.GetIngestClient(
item.Activity.DestinationTable.ClusterUri,
item.Activity.DestinationTable.DatabaseName,
item.TempTable!.TempTableName);
var blockTag = $"drop-by:kusto-copy|{Guid.NewGuid()}";
var newBlockItem = item.Block.ChangeState(BlockState.Queued);
newBlockItem.BlockTag = blockTag;
Trace.TraceInformation($"Block {item.Block.GetBlockKey()}: ingest " +
$"{item.Urls.Count()} urls");
var queuingTasks = item
.Urls
.Select(u => new
{
Url = u,
Task = QueueIngestUrlAsync(
ingestClient,
newBlockItem,
new Uri(u.Url),
ct)
})
.ToImmutableArray();
await TaskHelper.WhenAllWithErrors(queuingTasks.Select(o => o.Task));
var newUrlItems = queuingTasks
.Select(o => MarkUrlAsQueued(o.Url, o.Task.Result));
RowItemGateway.Append(newUrlItems);
RowItemGateway.Append(newBlockItem);
Trace.TraceInformation($"Block {item.Block.GetBlockKey()}: " +
$"{item.Urls.Count()} urls queued");
}
}