internal void RunDirectIngest()

in src/Ingestor.cs [374:448]


        internal void RunDirectIngest(KustoConnectionStringBuilder kcsb)
        {
            Reset();

            var stopwatch = ExtendedStopwatch.StartNew();
            var currentPhaseStopwatch = ExtendedStopwatch.StartNew();

            // small patch
            if (m_ingestionProperties.Format.HasValue && !m_ingestionProperties.AdditionalProperties.ContainsKey(KustoIngestionProperties.FormatPropertyName))
            {
                m_ingestionProperties.AdditionalProperties.Add(KustoIngestionProperties.FormatPropertyName, Enum.GetName(typeof(DataSourceFormat), (m_ingestionProperties.Format)));
            }

            using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kcsb))
            {
                bool bKustoRunningLocally = Utilities.IsLocalKustoConnection(kcsb);

                // Prepare batches for ingest:
                RunPrepareBatchesForDirectIngest(kustoClient, kustoRunningLocally: bKustoRunningLocally);

                currentPhaseStopwatch.Stop();
                m_logger.LogSuccess($"    RunPrepareBatchesForDirectIngest done. Time elapsed: {currentPhaseStopwatch.Elapsed:c}");
                m_logger.LogSuccess($"    {DirectIngestStats()}");

                // Split into batches and ingest...
                m_logger.LogVerbose($"==> Splitting [{m_listIntermediateSources.SafeFastCount()}] sources into batches for ingestion...");
                var batches = SplitIntoBatches(m_listIntermediateSources, m_bFileSystem, m_directIngestBatchSizeLimitInBytes, m_directIngestFilesLimitPerBatch);
                Interlocked.Add(ref m_batchesProduced, batches.SafeFastCount());
                m_logger.LogVerbose($"==> Done. Prepared [{batches.SafeFastCount()}] batches.");
                m_logger.LogSuccess($"    {DirectIngestStats()}");

                currentPhaseStopwatch.Restart();

                // Ingest
                // Twist: if we have small enough number of batches, or command line argument set, we perform a synchronous parallel ingest
                bool bIngestLocally = (bKustoRunningLocally && m_bFileSystem);

                if (!bIngestLocally && (batches.SafeFastCount() <= c_BatchesLimitForSyncIngest || m_bDirectIngestUseSyncMode))
                {
                    m_logger.LogVerbose($"==> Ingesting [{batches.SafeFastCount()}] batches synchronously...");
                    RunSyncDirectIngestInBatches(kustoClient, batches, m_ingestionProperties, m_ingestWithManagedIdentity);
                    m_logger.LogInfo($"==> Ingestion complete.");
                }
                else
                {
                    RunDirectIngestInBatches(kustoClient, batches, bIngestLocally);
                }

                currentPhaseStopwatch.Stop();
                stopwatch.Stop();

                var failedOperations = m_operationResults.Where(r => string.Equals(r.State, "Failed", StringComparison.OrdinalIgnoreCase));
                if (failedOperations.SafeFastNone())
                {
                    m_logger.LogSuccess($"    RunDirectIngestInBatches done. Time elapsed: {currentPhaseStopwatch.Elapsed:c}");
                    m_logger.LogSuccess($"    RunDirectIngest completed without errors. Total time elapsed: {stopwatch.Elapsed:c}");
                    m_logger.LogSuccess($"    {DirectIngestStats()}");
                }
                else
                {
                    m_logger.LogWarning($"    RunDirectIngestInBatches done. Time elapsed: {currentPhaseStopwatch.Elapsed:c}");
                    m_logger.LogWarning($"    RunDirectIngest completed with errors. Total time elapsed: {stopwatch.Elapsed:c}");
                    m_logger.LogWarning($"    {DirectIngestStats()}");

                    m_logger.LogError($"==> [{failedOperations.SafeFastCount()}] out of [{m_batchesIngested}] ingest operations failed:");
                    var cmd = CslCommandGenerator.GenerateIngestionFailuresShowCommand(failedOperations.Select(op => op.OperationId));
                    var result = kustoClient.ExecuteControlCommand<IngestionFailuresSummarizedShowCommandResult>(cmd);
                    result.ForEach(fo =>
                    {
                        m_logger.LogError($"    Failed to ingest data source '{fo.IngestionSourcePath}'.");
                        m_logger.LogError($"    Failure details: {fo.Details}");
                    });
                }
            }
        }