private void RunSyncDirectIngestInBatches()

in src/Ingestor.cs [588:632]


        private void RunSyncDirectIngestInBatches(ICslAdminProvider kustoClient,
                                                            IEnumerable<DataSourcesBatch> batches,
                                                            KustoQueuedIngestionProperties ingestionProperties,
                                                            string ingestWithManagedIdentity)
        {
            ExtendedParallel.ForEachEx(batches, m_directIngestParallelRequests, (b) =>
            {
                try
                {

                    List<string> batchUris;

                    if (!string.IsNullOrWhiteSpace(ingestWithManagedIdentity))
                    {
                        batchUris = b.Sources.Select((s) => $"{s.SafeCloudFileUri};managed_identity={ingestWithManagedIdentity}").ToList();
                    }
                    else
                    {
                        batchUris = b.Sources.Select((s) => s.CloudFileUri).ToList();
                    }

                    var cmd = CslCommandGenerator.GenerateTableIngestPullCommand(ingestionProperties.TableName, batchUris, false,
                        extensions: ingestionProperties.AdditionalProperties,
                        tags: ingestionProperties.AdditionalTags);
                    var clientRequestProperties = new ClientRequestProperties();
                    clientRequestProperties.SetOption(ClientRequestProperties.OptionServerTimeout, TimeSpan.FromMinutes(30));
                    var cmdResult = kustoClient.ExecuteControlCommand<DataIngestPullCommandResult>(ingestionProperties.DatabaseName, cmd, clientRequestProperties);

                    // Get the operation result
                    cmd = CslCommandGenerator.GenerateOperationsShowCommand(cmdResult.First().OperationId);
                    var showOperationResult = kustoClient.ExecuteControlCommand<OperationsShowCommandResult>(cmd);

                    lock (m_operationResultsLock)
                    {
                        m_operationResults.Add(showOperationResult.First());
                    }
                    Interlocked.Increment(ref m_batchesIngested);
                }
                catch (Exception ex)
                {
                    m_logger.LogError($"Error in RunSyncDirectIngestInBatches: {ex.Message}");
                }
            });

        }