in src/Ingestor.cs [977:1030]
private void IngestBatch(DataSourcesBatch batch,
ICslAdminProvider kustoClient,
bool bIngestLocally,
KustoIngestionProperties baseIngestionProperties,
TimeSpan ingestOperationTimeout,
string ingestWithManagedIdentity)
{
try
{
List<string> batchUris = null;
if (bIngestLocally)
{
batchUris = batch.Sources.Select((s) => s.FileSystemPath).ToList();
}
else if (!string.IsNullOrWhiteSpace(ingestWithManagedIdentity))
{
batchUris = batch.Sources.Select((s) => $"{s.SafeCloudFileUri};managed_identity={ingestWithManagedIdentity}").ToList();
}
else
{
batchUris = batch.Sources.Select((s) => s.CloudFileUri).ToList();
}
KustoIngestionProperties ingestionProperties = null;
// Take care of the CreationTime
if (batch.CreationTimeUtc.HasValue)
{
ingestionProperties = new KustoIngestionProperties(baseIngestionProperties);
ingestionProperties.AdditionalProperties.Add("creationTime", batch.CreationTimeUtc.Value.ToString("s"));
}
else
{
ingestionProperties = baseIngestionProperties;
}
var cmd = CslCommandGenerator.GenerateTableIngestPullCommand(ingestionProperties.TableName, batchUris, true,
extensions: ingestionProperties.AdditionalProperties,
tags: ingestionProperties.AdditionalTags);
var operationResults = kustoClient.ExecuteAsyncControlCommand(ingestionProperties.DatabaseName, cmd, ingestOperationTimeout, TimeSpan.FromSeconds(2));
m_logger.LogInfo("==> Complete ingest");
lock (m_operationResultsLock)
{
m_operationResults.Add(operationResults);
}
Interlocked.Increment(ref m_batchesIngested);
}
catch (Exception ex)
{
m_logger.LogError($"IngestBatch failed: {ex.Message}");
}
}