private void IngestBatch()

in src/Ingestor.cs [977:1030]


        private void IngestBatch(DataSourcesBatch batch,
                                 ICslAdminProvider kustoClient,
                                 bool bIngestLocally,
                                 KustoIngestionProperties baseIngestionProperties,
                                 TimeSpan ingestOperationTimeout,
                                 string ingestWithManagedIdentity)
        {
            try
            {
                List<string> batchUris = null;
                if (bIngestLocally)
                {
                    batchUris = batch.Sources.Select((s) => s.FileSystemPath).ToList();
                }
                else if (!string.IsNullOrWhiteSpace(ingestWithManagedIdentity))
                {
                    batchUris = batch.Sources.Select((s) => $"{s.SafeCloudFileUri};managed_identity={ingestWithManagedIdentity}").ToList();
                }
                else
                {
                    batchUris = batch.Sources.Select((s) => s.CloudFileUri).ToList();
                }

                KustoIngestionProperties ingestionProperties = null;

                // Take care of the CreationTime
                if (batch.CreationTimeUtc.HasValue)
                {
                    ingestionProperties = new KustoIngestionProperties(baseIngestionProperties);
                    ingestionProperties.AdditionalProperties.Add("creationTime", batch.CreationTimeUtc.Value.ToString("s"));
                }
                else
                {
                    ingestionProperties = baseIngestionProperties;
                }

                var cmd = CslCommandGenerator.GenerateTableIngestPullCommand(ingestionProperties.TableName, batchUris, true,
                                                                             extensions: ingestionProperties.AdditionalProperties,
                                                                             tags: ingestionProperties.AdditionalTags);
                var operationResults = kustoClient.ExecuteAsyncControlCommand(ingestionProperties.DatabaseName, cmd, ingestOperationTimeout, TimeSpan.FromSeconds(2));

                m_logger.LogInfo("==> Complete ingest");

                lock (m_operationResultsLock)
                {
                    m_operationResults.Add(operationResults);
                }
                Interlocked.Increment(ref m_batchesIngested);
            }
            catch (Exception ex)
            {
                m_logger.LogError($"IngestBatch failed: {ex.Message}");
            }
        }