in src/Ingestor.cs [820:894]
private async Task IngestSingle(DataSource storageObject,
int objectsToTake,
IKustoIngestClient ingestClient,
bool fromFileSystem,
bool deleteSourcesOnSuccess,
KustoQueuedIngestionProperties baseIngestionProperties,
string ingestWithManagedIdentity)
{
var stopwatch = ExtendedStopwatch.StartNew();
var fileNameForTrace = fromFileSystem ? storageObject.FileSystemPath : storageObject.SafeCloudFileUri;
try
{
if (objectsToTake >= 0 && objectsToTake <= Interlocked.Read(ref m_objectsPosted))
{
// we're done, don't need new stuff
return;
}
string fileUri;
KustoQueuedIngestionProperties ingestionProperties = null;
// Take care of the CreationTime
if (storageObject.CreationTimeUtc.HasValue)
{
ingestionProperties = new KustoQueuedIngestionProperties(baseIngestionProperties);
ingestionProperties.AdditionalProperties.Add("creationTime", storageObject.CreationTimeUtc.Value.ToString("s"));
}
else
{
ingestionProperties = baseIngestionProperties;
}
while (!m_ingestionFixedWindowThrottlerPolicy.ShouldInvoke())
{
await Task.Delay(TimeSpan.FromMilliseconds(c_delayOnThrottlingMs)).ConfigureAwait(false);
}
if (fromFileSystem)
{
fileUri = storageObject.FileSystemPath;
}
else if (!string.IsNullOrWhiteSpace(ingestWithManagedIdentity))
{
fileUri = $"{storageObject.SafeCloudFileUri};managed_identity={ingestWithManagedIdentity}";
}
else
{
fileUri = storageObject.CloudFileUri;
}
var result = await ingestClient.IngestFromStorageAsync(
fileUri,
ingestionProperties,
new StorageSourceOptions() { DeleteSourceOnSuccess = deleteSourcesOnSuccess, Size = storageObject.SizeInBytes }
).ConfigureAwait(false);
// Using the tracer so it doesnt appear on console but only in log files
m_logger.Tracer.TraceInformation($"IngestSingle: File {fileNameForTrace} ended successfully after {stopwatch.ElapsedMilliseconds} millis.");
Interlocked.Increment(ref m_objectsPosted);
if (m_bWaitForIngestCompletion)
{
lock (m_ingestionResultsLock)
{
m_ingestionResults.Add(result);
}
}
}
catch (Exception ex)
{
m_logger.LogError($"IngestSingle failed on blob '{fileNameForTrace}', after '{stopwatch.ElapsedMilliseconds}' millis, error: {ex.MessageEx(true)}");
}
}