in src/Ingestor.cs [169:230]
private Ingestor(ExtendedCommandLineArgs args, AdditionalArguments additionalArgs, KustoQueuedIngestionProperties ingestionProperties, LoggerTracer logger)
{
m_args = args;
m_ingestionProperties = ingestionProperties;
m_logger = logger;
m_ingestionProperties.ReportLevel = IngestionReportLevel.None;
m_ingestionProperties.ReportMethod = IngestionReportMethod.Queue;
m_bFileSystem = Utilities.IsFileSystemPath(m_args.SourcePath);
#if OPEN_SOURCE_COMPILATION
if (!m_bFileSystem) {
throw new Exception("Support only ingestion from file system in open source compilation.");
}
#endif
m_estimatedCompressionRatio = args.EstimatedCompressionRatio;
m_directIngestParallelRequests =
(args.ParallelRequests.HasValue && args.ParallelRequests.Value >= 1) ? Math.Min(args.ParallelRequests.Value, c_MaxParallelismDegree) : 8;
m_directIngestBatchSizeLimitInBytes =
(args.BatchSizeInMBs.HasValue && args.BatchSizeInMBs.Value > 0) ? args.BatchSizeInMBs.Value * MemoryConstants._1MB : c_DefaultDirectIngestBatchSizeBytes;
m_directIngestFilesLimitPerBatch =
(args.FilesInBatch.HasValue && args.FilesInBatch.Value >= 0) ? args.FilesInBatch.Value : 0;
m_bDirectIngestUseSyncMode = args.ForceSync ?? false;
m_ingestWithManagedIdentity = args.IngestWithManagedIdentity;
m_connectToStorageWithUserAuth = args.ConnectToStorageWithUserAuth;
m_connectToStorageLoginUri = args.ConnectToStorageLoginUri;
m_connectToStorageWithManagedIdentity = args.ConnectToStorageWithManagedIdentity;
m_objectsCountQuota = m_args.Limit;
if (!string.IsNullOrEmpty(m_args.Pattern) && !string.Equals(m_args.Pattern, "*", StringComparison.Ordinal))
{
string regexExpression = Regex.Escape(m_args.Pattern).Replace(@"\*", ".*").Replace(@"\?", ".") + "$";
m_patternRegex = new Regex(regexExpression, RegexOptions.Compiled);
}
m_creationTimeInNamePattern = additionalArgs.DateTimePattern;
m_ingestCompletionTimeout = TimeSpan.FromMinutes(m_args.IngestTimeoutInMinutes);
m_bWaitForIngestCompletion = (!m_args.DontWait && m_ingestCompletionTimeout > TimeSpan.Zero);
if (m_bWaitForIngestCompletion)
{
m_ingestionProperties.ReportLevel = IngestionReportLevel.FailuresAndSuccesses;
m_ingestionProperties.ReportMethod = IngestionReportMethod.Table;
m_ingestionResults = new List<IKustoIngestionResult>();
}
else
{
m_ingestionProperties.ReportLevel = IngestionReportLevel.None;
m_ingestionProperties.ReportMethod = IngestionReportMethod.Queue;
}
m_ingestionProperties.IgnoreSizeLimit = m_args.NoSizeLimit;
m_ingestionFixedWindowThrottlerPolicy = new FixedWindowThrottlerPolicy(args.IngestionRateCount, c_ingestionRateTime);
m_listingFixedWindowThrottlerPolicy = new FixedWindowThrottlerPolicy(args.ListingRateCount, c_ingestionRateTime);
m_listingLockOrNull = m_objectsCountQuota > 0 ? new SemaphoreSlim(initialCount: 1, maxCount: 1) : null;
InitPSLFields();
}