code/KustoCopyConsole/Runner/TempTableCreatingRunner.cs (129 lines of code) (raw):
using Azure.Core;
using KustoCopyConsole.Entity.InMemory;
using KustoCopyConsole.Entity.RowItems;
using KustoCopyConsole.Entity.RowItems.Keys;
using KustoCopyConsole.Entity.State;
using KustoCopyConsole.JobParameter;
using KustoCopyConsole.Kusto;
using KustoCopyConsole.Storage;
using System.Collections.Immutable;
namespace KustoCopyConsole.Runner
{
internal class TempTableCreatingRunner : RunnerBase
{
public TempTableCreatingRunner(
MainJobParameterization parameterization,
TokenCredential credential,
RowItemGateway rowItemGateway,
DbClientFactory dbClientFactory,
IStagingBlobUriProvider stagingBlobUriProvider)
: base(
parameterization,
credential,
rowItemGateway,
dbClientFactory,
stagingBlobUriProvider,
TimeSpan.FromSeconds(10))
{
}
public async Task RunAsync(CancellationToken ct)
{
var taskMap = new Dictionary<IterationKey, Task>();
while (taskMap.Any() || !AllActivitiesCompleted())
{
var newIterations = RowItemGateway.InMemoryCache.ActivityMap
.Values
.SelectMany(a => a.IterationMap.Values)
.Where(i => i.RowItem.State >= IterationState.Planning)
.Where(i => i.TempTable == null
|| i.TempTable.State == TempTableState.Creating)
.Select(i => new
{
Key = i.RowItem.GetIterationKey(),
Iteration = i
})
.Where(o => !taskMap.ContainsKey(o.Key));
foreach (var o in newIterations)
{
taskMap.Add(o.Key, EnsureTempTableCreatedAsync(o.Iteration, ct));
}
await CleanTaskMapAsync(taskMap);
// Sleep
await SleepAsync(ct);
}
}
private async Task CleanTaskMapAsync(IDictionary<IterationKey, Task> taskMap)
{
foreach (var taskKey in taskMap.Keys.ToImmutableArray())
{
var task = taskMap[taskKey];
if (task.IsCompleted)
{
await task;
taskMap.Remove(taskKey);
}
}
}
private async Task EnsureTempTableCreatedAsync(
IterationCache iteration,
CancellationToken ct)
{
var doesPreExist = iteration.TempTable != null;
if (!doesPreExist)
{
await PrepareTempTableAsync(iteration.RowItem, ct);
iteration = RowItemGateway.InMemoryCache
.ActivityMap[iteration.RowItem.ActivityName]
.IterationMap[iteration.RowItem.IterationId];
}
if (iteration.TempTable!.State == TempTableState.Creating)
{
await CreateTempTableAsync(iteration.TempTable, doesPreExist, ct);
}
}
private async Task PrepareTempTableAsync(
IterationRowItem iterationItem,
CancellationToken ct)
{
var activity = RowItemGateway.InMemoryCache
.ActivityMap[iterationItem.ActivityName]
.RowItem;
var tempTableName =
$"kc-{activity.DestinationTable.TableName}-{Guid.NewGuid().ToString("N")}";
var tempTableItem = new TempTableRowItem
{
State = TempTableState.Creating,
ActivityName = iterationItem.ActivityName,
IterationId = iterationItem.IterationId,
TempTableName = tempTableName
};
// We want to ensure the item is appended before creating a temp table so
// we don't lose track of the table
await RowItemGateway.AppendAndPersistAsync(tempTableItem, ct);
}
private async Task CreateTempTableAsync(
TempTableRowItem tempTableItem,
bool doesPreExist,
CancellationToken ct)
{
var activity = RowItemGateway.InMemoryCache
.ActivityMap[tempTableItem.ActivityName]
.RowItem;
var dbCommandClient = DbClientFactory.GetDbCommandClient(
activity.DestinationTable.ClusterUri,
activity.DestinationTable.DatabaseName);
var priority = new KustoPriority(tempTableItem.GetIterationKey());
if (doesPreExist)
{
await dbCommandClient.DropTableIfExistsAsync(
priority,
tempTableItem.TempTableName,
ct);
}
await dbCommandClient.CreateTempTableAsync(
priority,
activity.DestinationTable.TableName,
tempTableItem.TempTableName,
ct);
tempTableItem = tempTableItem.ChangeState(TempTableState.Created);
RowItemGateway.Append(tempTableItem);
}
}
}