in code/KustoCopyConsole/Runner/AwaitIngestRunner.cs [151:191]
private async Task IterationFailureDetectionAsync(
IterationCache iterationCache,
CancellationToken ct)
{
var queuedBlocks = iterationCache
.BlockMap
.Values
.Where(b => b.RowItem.State == BlockState.Queued);
if (queuedBlocks.Any())
{
var activityItem = RowItemGateway.InMemoryCache
.ActivityMap[iterationCache.RowItem.ActivityName]
.RowItem;
var ingestClient = DbClientFactory.GetIngestClient(
activityItem.DestinationTable.ClusterUri,
activityItem.DestinationTable.DatabaseName,
iterationCache.TempTable!.TempTableName);
var oldestBlock = queuedBlocks
.ArgMin(b => b.RowItem.Updated);
foreach (var urlItem in oldestBlock.UrlMap.Values.Select(u => u.RowItem))
{
var failure = await ingestClient.FetchIngestionFailureAsync(
urlItem.SerializedQueuedResult);
if (failure != null)
{
TraceWarning(
$"Warning! Ingestion failed with status '{failure.Status}'" +
$"and detail '{failure.Details}' for blob {urlItem.Url} in block " +
$"{oldestBlock.RowItem.BlockId}, iteration " +
$"{oldestBlock.RowItem.IterationId}, activity " +
$"{oldestBlock.RowItem.ActivityName} ; block will be re-exported");
ReturnToPlanned(oldestBlock);
return;
}
}
}
}