private async Task IngestSingle()

in src/Ingestor.cs [820:894]


        private async Task IngestSingle(DataSource storageObject,
                                  int objectsToTake,
                                  IKustoIngestClient ingestClient,
                                  bool fromFileSystem,
                                  bool deleteSourcesOnSuccess,
                                  KustoQueuedIngestionProperties baseIngestionProperties,
                                  string ingestWithManagedIdentity)
        {
            var stopwatch = ExtendedStopwatch.StartNew();

            var fileNameForTrace = fromFileSystem ? storageObject.FileSystemPath : storageObject.SafeCloudFileUri; 
            try
            {
                if (objectsToTake >= 0 && objectsToTake <= Interlocked.Read(ref m_objectsPosted))
                {
                    // we're done, don't need new stuff
                    return;
                }

                string fileUri;
                KustoQueuedIngestionProperties ingestionProperties = null;

                // Take care of the CreationTime
                if (storageObject.CreationTimeUtc.HasValue)
                {
                    ingestionProperties = new KustoQueuedIngestionProperties(baseIngestionProperties);
                    ingestionProperties.AdditionalProperties.Add("creationTime", storageObject.CreationTimeUtc.Value.ToString("s"));
                }
                else
                {
                    ingestionProperties = baseIngestionProperties;
                }

                while (!m_ingestionFixedWindowThrottlerPolicy.ShouldInvoke())
                {
                    await Task.Delay(TimeSpan.FromMilliseconds(c_delayOnThrottlingMs)).ConfigureAwait(false);
                }

                if (fromFileSystem)
                {
                    fileUri = storageObject.FileSystemPath;
                }
                else if (!string.IsNullOrWhiteSpace(ingestWithManagedIdentity))
                {
                    fileUri = $"{storageObject.SafeCloudFileUri};managed_identity={ingestWithManagedIdentity}";
                }
                else
                {
                    fileUri = storageObject.CloudFileUri;
                }

                var result = await ingestClient.IngestFromStorageAsync(
                            fileUri,
                            ingestionProperties,
                            new StorageSourceOptions() { DeleteSourceOnSuccess = deleteSourcesOnSuccess, Size = storageObject.SizeInBytes }
                        ).ConfigureAwait(false);
            
                // Using the tracer so it doesnt appear on console but only in log files
                m_logger.Tracer.TraceInformation($"IngestSingle: File {fileNameForTrace} ended successfully after {stopwatch.ElapsedMilliseconds} millis.");

                Interlocked.Increment(ref m_objectsPosted);

                if (m_bWaitForIngestCompletion)
                {
                    lock (m_ingestionResultsLock)
                    {
                        m_ingestionResults.Add(result);
                    }
                }
            }
            catch (Exception ex)
            {
                m_logger.LogError($"IngestSingle failed on blob '{fileNameForTrace}', after '{stopwatch.ElapsedMilliseconds}' millis, error: {ex.MessageEx(true)}");
            }
        }