in code/KustoCopyConsole/Runner/IterationCompletingRunner.cs [44:81]
private async Task CompleteIterationsAsync(CancellationToken ct)
{
var completingIterations = RowItemGateway.InMemoryCache
.ActivityMap
.Values
.Where(a => a.RowItem.State != ActivityState.Completed)
.SelectMany(a => a.IterationMap.Values)
// The iteration is planned, hence all its blocks are in place
.Where(i => i.RowItem.State == IterationState.Planned)
// All blocks in the iteration are moved
.Where(i => !i.BlockMap.Any()
|| !i.BlockMap.Values.Any(b => b.RowItem.State != BlockState.ExtentMoved));
foreach (var iteration in completingIterations)
{
if (iteration.TempTable != null
&& iteration.TempTable.State == TempTableState.Created)
{
var tableId = RowItemGateway.InMemoryCache
.ActivityMap[iteration.RowItem.ActivityName]
.RowItem
.DestinationTable;
var dbClient = DbClientFactory.GetDbCommandClient(
tableId.ClusterUri,
tableId.DatabaseName);
var iterationKey = iteration.RowItem.GetIterationKey();
await dbClient.DropTableIfExistsAsync(
new KustoPriority(iterationKey),
iteration.TempTable.TempTableName,
ct);
await StagingBlobUriProvider.DeleteStagingDirectoryAsync(iterationKey, ct);
}
var newIteration = iteration.RowItem.ChangeState(IterationState.Completed);
RowItemGateway.Append(newIteration);
}
}