in src/Ingestor.cs [588:632]
private void RunSyncDirectIngestInBatches(ICslAdminProvider kustoClient,
IEnumerable<DataSourcesBatch> batches,
KustoQueuedIngestionProperties ingestionProperties,
string ingestWithManagedIdentity)
{
ExtendedParallel.ForEachEx(batches, m_directIngestParallelRequests, (b) =>
{
try
{
List<string> batchUris;
if (!string.IsNullOrWhiteSpace(ingestWithManagedIdentity))
{
batchUris = b.Sources.Select((s) => $"{s.SafeCloudFileUri};managed_identity={ingestWithManagedIdentity}").ToList();
}
else
{
batchUris = b.Sources.Select((s) => s.CloudFileUri).ToList();
}
var cmd = CslCommandGenerator.GenerateTableIngestPullCommand(ingestionProperties.TableName, batchUris, false,
extensions: ingestionProperties.AdditionalProperties,
tags: ingestionProperties.AdditionalTags);
var clientRequestProperties = new ClientRequestProperties();
clientRequestProperties.SetOption(ClientRequestProperties.OptionServerTimeout, TimeSpan.FromMinutes(30));
var cmdResult = kustoClient.ExecuteControlCommand<DataIngestPullCommandResult>(ingestionProperties.DatabaseName, cmd, clientRequestProperties);
// Get the operation result
cmd = CslCommandGenerator.GenerateOperationsShowCommand(cmdResult.First().OperationId);
var showOperationResult = kustoClient.ExecuteControlCommand<OperationsShowCommandResult>(cmd);
lock (m_operationResultsLock)
{
m_operationResults.Add(showOperationResult.First());
}
Interlocked.Increment(ref m_batchesIngested);
}
catch (Exception ex)
{
m_logger.LogError($"Error in RunSyncDirectIngestInBatches: {ex.Message}");
}
});
}