in src/Ingestor.cs [1079:1188]
private void WaitForIngestionResult(IEnumerable<IKustoIngestionResult> ingestionResults, TimeSpan ingestOperationTimeout)
{
if (ingestionResults.SafeFastNone())
{
return;
}
var stopwatch = ExtendedStopwatch.StartNew();
m_logger.LogInfo($"==> Waiting for ingest operation(s) completion (will timeout after {ingestOperationTimeout.TotalMinutes} minutes)...");
IEnumerable<IngestionStatus> ingestionStatuses = null;
long monitoredIngestionOperations = 0, completedIngestionOperations = 0;
do
{
ingestionStatuses = ingestionResults.SelectMany((ir) => ir.GetIngestionStatusCollection()).ToList();
monitoredIngestionOperations = ingestionStatuses.SafeFastCount();
completedIngestionOperations = ingestionStatuses.Count((status) => status.Status != Status.Pending);
m_logger.LogVerbose($"==> [{completedIngestionOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations completed. Time elapsed: {stopwatch.Elapsed:c}");
if (completedIngestionOperations == monitoredIngestionOperations)
{
break;
}
Thread.Sleep(TimeSpan.FromSeconds(30));
} while (stopwatch.Elapsed < ingestOperationTimeout);
stopwatch.Stop();
// Status breakdown:
// Succeeded [Terminal, operation completed successfully]
// Failed [Terminal, operation failed]
// PartiallySucceeded [Terminal, operation succeeded for part of the data]
// Skipped [Terminal, operation ignored (no data or was already ingested)]
// Queued / Pending [Intermediate, operation has been posted off for execution on the service]
var successfulOperations = ingestionStatuses.Count((status) => status.Status == Status.Succeeded);
var partiallySucceededOperations = ingestionStatuses.Count((status) => status.Status == Status.PartiallySucceeded);
var failedOperations = ingestionStatuses.Count((status) => status.Status == Status.Failed);
var skippedOperations = ingestionStatuses.Count((status) => status.Status == Status.Skipped);
var pendingOperations = ingestionStatuses.Count((status) => status.Status == Status.Pending || status.Status == Status.Queued);
if (successfulOperations == monitoredIngestionOperations)
{
m_logger.LogSuccess($" Successfully completed : [{successfulOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.");
return;
}
// Break down non-successful operations
m_logger.LogWarning("Not all the operations completed successfully:");
var sb = new StringBuilder();
sb.AppendLine();
if (failedOperations > 0)
{
sb.AppendLine($"Failed operations: [{failedOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations have failed.");
sb.AppendLine("These operations failed permanently and no data was ingested. See the complete list for details.");
ingestionStatuses.Where(record => record.Status == Status.Failed).ForEach(record =>
{
sb.AppendLine($"- Failed to ingest '{record.IngestionSourcePath}', Id '{record.IngestionSourceId}'. Operation status is '{record.Status}'.");
sb.AppendLine($" Failure details: {record.Details}");
});
sb.AppendLine();
}
if (partiallySucceededOperations > 0)
{
sb.AppendLine($"Partially succeeded operations: [{partiallySucceededOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations have completed partially.");
sb.AppendLine("These operations succeeded partially, i.e., some data could have been ingested. See the complete list for details.");
ingestionStatuses.Where(record => record.Status == Status.PartiallySucceeded).ForEach(record =>
{
sb.AppendLine($"- Partially succeeded to ingest '{record.IngestionSourcePath}', Id '{record.IngestionSourceId}'. Operation status is '{record.Status}'.");
sb.AppendLine($" Operation details: {record.Details}");
});
sb.AppendLine();
}
if (skippedOperations > 0)
{
sb.AppendLine($"Skipped operations: [{skippedOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations have been skipped.");
sb.AppendLine("These operations were skipped and resulted in no data being ingested. These could indicate empty streams. See the complete list for details.");
ingestionStatuses.Where(record => record.Status == Status.Skipped).ForEach(record =>
{
sb.AppendLine($"- Did not ingest '{record.IngestionSourcePath}', Id '{record.IngestionSourceId}'. Operation status is '{record.Status}'.");
sb.AppendLine($" Operation details: {record.Details}");
});
sb.AppendLine();
}
if (pendingOperations > 0)
{
sb.AppendLine($"Pending operations: [{pendingOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations are still pending.");
sb.AppendLine("These operations have not completed yet, and are waiting to be executed by the service. See the complete list for details.");
ingestionStatuses.Where(record => record.Status == Status.Pending || record.Status == Status.Queued).ForEach(record =>
{
sb.AppendLine($"- Operation pending for '{record.IngestionSourcePath}', Id '{record.IngestionSourceId}'. Operation status is '{record.Status}'.");
});
sb.AppendLine();
}
sb.AppendLine($"Successfully completed : [{successfulOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.");
sb.Append(partiallySucceededOperations > 0 ? $"Partially succeeded (see above) : [{partiallySucceededOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.{Environment.NewLine}" : string.Empty);
sb.Append(failedOperations > 0 ? $"Failed (see above) : [{failedOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.{Environment.NewLine}" : string.Empty);
sb.Append(skippedOperations > 0 ? $"Skipped (see above) : [{skippedOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.{Environment.NewLine}" : string.Empty);
sb.Append(pendingOperations > 0 ? $"Pending (still in progress) : [{pendingOperations,7}] out of [{monitoredIngestionOperations,7}] ingest operations.{Environment.NewLine}" : string.Empty);
sb.AppendLine();
m_logger.LogInfo(sb.ToString());
}