private void WaitForIngestionResult()

in src/Ingestor.cs [1079:1188]


        private void WaitForIngestionResult(IEnumerable<IKustoIngestionResult> ingestionResults, TimeSpan ingestOperationTimeout)
        {
            if (ingestionResults.SafeFastNone())
            {
                return;
            }

            var stopwatch = ExtendedStopwatch.StartNew();
            m_logger.LogInfo($"==> Waiting for ingest operation(s) completion (will timeout after {ingestOperationTimeout.TotalMinutes} minutes)...");

            IEnumerable<IngestionStatus> ingestionStatuses = null;
            long monitoredIngestionOperations = 0, completedIngestionOperations = 0;

            do
            {
                ingestionStatuses = ingestionResults.SelectMany((ir) => ir.GetIngestionStatusCollection()).ToList();
                monitoredIngestionOperations = ingestionStatuses.SafeFastCount();
                completedIngestionOperations = ingestionStatuses.Count((status) => status.Status != Status.Pending);

                m_logger.LogVerbose($"==> [{completedIngestionOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations completed. Time elapsed: {stopwatch.Elapsed:c}");
                if (completedIngestionOperations == monitoredIngestionOperations)
                {
                    break;
                }

                Thread.Sleep(TimeSpan.FromSeconds(30));
            } while (stopwatch.Elapsed < ingestOperationTimeout);
            stopwatch.Stop();

            // Status breakdown:
            // Succeeded            [Terminal, operation completed successfully]
            // Failed               [Terminal, operation failed]
            // PartiallySucceeded   [Terminal, operation succeeded for part of the data]
            // Skipped              [Terminal, operation ignored (no data or was already ingested)]
            // Queued / Pending     [Intermediate, operation has been posted off for execution on the service]

            var successfulOperations = ingestionStatuses.Count((status) => status.Status == Status.Succeeded);
            var partiallySucceededOperations = ingestionStatuses.Count((status) => status.Status == Status.PartiallySucceeded);
            var failedOperations = ingestionStatuses.Count((status) => status.Status == Status.Failed);
            var skippedOperations = ingestionStatuses.Count((status) => status.Status == Status.Skipped);
            var pendingOperations = ingestionStatuses.Count((status) => status.Status == Status.Pending || status.Status == Status.Queued);

            if (successfulOperations == monitoredIngestionOperations)
            {
                m_logger.LogSuccess($"    Successfully completed          : [{successfulOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.");
                return;
            }

            // Break down non-successful operations
            m_logger.LogWarning("Not all the operations completed successfully:");

            var sb = new StringBuilder();
            sb.AppendLine();

            if (failedOperations > 0)
            {
                sb.AppendLine($"Failed operations: [{failedOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations have failed.");
                sb.AppendLine("These operations failed permanently and no data was ingested. See the complete list for details.");
                ingestionStatuses.Where(record => record.Status == Status.Failed).ForEach(record =>
                {
                    sb.AppendLine($"-   Failed to ingest '{record.IngestionSourcePath}', Id '{record.IngestionSourceId}'. Operation status is '{record.Status}'.");
                    sb.AppendLine($"    Failure details: {record.Details}");
                });
                sb.AppendLine();
            }

            if (partiallySucceededOperations > 0)
            {
                sb.AppendLine($"Partially succeeded operations: [{partiallySucceededOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations have completed partially.");
                sb.AppendLine("These operations succeeded partially, i.e., some data could have been ingested. See the complete list for details.");
                ingestionStatuses.Where(record => record.Status == Status.PartiallySucceeded).ForEach(record =>
                {
                    sb.AppendLine($"-   Partially succeeded to ingest '{record.IngestionSourcePath}', Id '{record.IngestionSourceId}'. Operation status is '{record.Status}'.");
                    sb.AppendLine($"    Operation details: {record.Details}");
                });
                sb.AppendLine();
            }

            if (skippedOperations > 0)
            {
                sb.AppendLine($"Skipped operations: [{skippedOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations have been skipped.");
                sb.AppendLine("These operations were skipped and resulted in no data being ingested. These could indicate empty streams. See the complete list for details.");
                ingestionStatuses.Where(record => record.Status == Status.Skipped).ForEach(record =>
                {
                    sb.AppendLine($"-   Did not ingest '{record.IngestionSourcePath}', Id '{record.IngestionSourceId}'. Operation status is '{record.Status}'.");
                    sb.AppendLine($"    Operation details: {record.Details}");
                });
                sb.AppendLine();
            }

            if (pendingOperations > 0)
            {
                sb.AppendLine($"Pending operations: [{pendingOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations are still pending.");
                sb.AppendLine("These operations have not completed yet, and are waiting to be executed by the service. See the complete list for details.");
                ingestionStatuses.Where(record => record.Status == Status.Pending || record.Status == Status.Queued).ForEach(record =>
                {
                    sb.AppendLine($"-   Operation pending for '{record.IngestionSourcePath}', Id '{record.IngestionSourceId}'. Operation status is '{record.Status}'.");
                });
                sb.AppendLine();
            }

            sb.AppendLine($"Successfully completed          : [{successfulOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.");
            sb.Append(partiallySucceededOperations > 0 ? $"Partially succeeded (see above) : [{partiallySucceededOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.{Environment.NewLine}" : string.Empty);
            sb.Append(failedOperations > 0 ? $"Failed (see above)              : [{failedOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.{Environment.NewLine}" : string.Empty);
            sb.Append(skippedOperations > 0 ? $"Skipped (see above)             : [{skippedOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.{Environment.NewLine}" : string.Empty);
            sb.Append(pendingOperations > 0 ? $"Pending (still in progress)     : [{pendingOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.{Environment.NewLine}" : string.Empty);
            sb.AppendLine();

            m_logger.LogInfo(sb.ToString());
        }