in src/Ingestor.cs [374:448]
internal void RunDirectIngest(KustoConnectionStringBuilder kcsb)
{
Reset();
var stopwatch = ExtendedStopwatch.StartNew();
var currentPhaseStopwatch = ExtendedStopwatch.StartNew();
// small patch
if (m_ingestionProperties.Format.HasValue && !m_ingestionProperties.AdditionalProperties.ContainsKey(KustoIngestionProperties.FormatPropertyName))
{
m_ingestionProperties.AdditionalProperties.Add(KustoIngestionProperties.FormatPropertyName, Enum.GetName(typeof(DataSourceFormat), (m_ingestionProperties.Format)));
}
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kcsb))
{
bool bKustoRunningLocally = Utilities.IsLocalKustoConnection(kcsb);
// Prepare batches for ingest:
RunPrepareBatchesForDirectIngest(kustoClient, kustoRunningLocally: bKustoRunningLocally);
currentPhaseStopwatch.Stop();
m_logger.LogSuccess($" RunPrepareBatchesForDirectIngest done. Time elapsed: {currentPhaseStopwatch.Elapsed:c}");
m_logger.LogSuccess($" {DirectIngestStats()}");
// Split into batches and ingest...
m_logger.LogVerbose($"==> Splitting [{m_listIntermediateSources.SafeFastCount()}] sources into batches for ingestion...");
var batches = SplitIntoBatches(m_listIntermediateSources, m_bFileSystem, m_directIngestBatchSizeLimitInBytes, m_directIngestFilesLimitPerBatch);
Interlocked.Add(ref m_batchesProduced, batches.SafeFastCount());
m_logger.LogVerbose($"==> Done. Prepared [{batches.SafeFastCount()}] batches.");
m_logger.LogSuccess($" {DirectIngestStats()}");
currentPhaseStopwatch.Restart();
// Ingest
// Twist: if we have small enough number of batches, or command line argument set, we perform a synchronous parallel ingest
bool bIngestLocally = (bKustoRunningLocally && m_bFileSystem);
if (!bIngestLocally && (batches.SafeFastCount() <= c_BatchesLimitForSyncIngest || m_bDirectIngestUseSyncMode))
{
m_logger.LogVerbose($"==> Ingesting [{batches.SafeFastCount()}] batches synchronously...");
RunSyncDirectIngestInBatches(kustoClient, batches, m_ingestionProperties, m_ingestWithManagedIdentity);
m_logger.LogInfo($"==> Ingestion complete.");
}
else
{
RunDirectIngestInBatches(kustoClient, batches, bIngestLocally);
}
currentPhaseStopwatch.Stop();
stopwatch.Stop();
var failedOperations = m_operationResults.Where(r => string.Equals(r.State, "Failed", StringComparison.OrdinalIgnoreCase));
if (failedOperations.SafeFastNone())
{
m_logger.LogSuccess($" RunDirectIngestInBatches done. Time elapsed: {currentPhaseStopwatch.Elapsed:c}");
m_logger.LogSuccess($" RunDirectIngest completed without errors. Total time elapsed: {stopwatch.Elapsed:c}");
m_logger.LogSuccess($" {DirectIngestStats()}");
}
else
{
m_logger.LogWarning($" RunDirectIngestInBatches done. Time elapsed: {currentPhaseStopwatch.Elapsed:c}");
m_logger.LogWarning($" RunDirectIngest completed with errors. Total time elapsed: {stopwatch.Elapsed:c}");
m_logger.LogWarning($" {DirectIngestStats()}");
m_logger.LogError($"==> [{failedOperations.SafeFastCount()}] out of [{m_batchesIngested}] ingest operations failed:");
var cmd = CslCommandGenerator.GenerateIngestionFailuresShowCommand(failedOperations.Select(op => op.OperationId));
var result = kustoClient.ExecuteControlCommand<IngestionFailuresSummarizedShowCommandResult>(cmd);
result.ForEach(fo =>
{
m_logger.LogError($" Failed to ingest data source '{fo.IngestionSourcePath}'.");
m_logger.LogError($" Failure details: {fo.Details}");
});
}
}
}