public CloudFetchDownloadManager()

in csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs [60:183]


        public CloudFetchDownloadManager(DatabricksStatement statement, Schema schema, bool isLz4Compressed)
        {
            _statement = statement ?? throw new ArgumentNullException(nameof(statement));
            _schema = schema ?? throw new ArgumentNullException(nameof(schema));
            _isLz4Compressed = isLz4Compressed;

            // Get configuration values from connection properties
            var connectionProps = statement.Connection.Properties;

            // Parse parallel downloads
            int parallelDownloads = DefaultParallelDownloads;
            if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchParallelDownloads, out string? parallelDownloadsStr))
            {
                if (int.TryParse(parallelDownloadsStr, out int parsedParallelDownloads) && parsedParallelDownloads > 0)
                {
                    parallelDownloads = parsedParallelDownloads;
                }
                else
                {
                    throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchParallelDownloads}: {parallelDownloadsStr}. Expected a positive integer.");
                }
            }

            // Parse prefetch count
            int prefetchCount = DefaultPrefetchCount;
            if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchPrefetchCount, out string? prefetchCountStr))
            {
                if (int.TryParse(prefetchCountStr, out int parsedPrefetchCount) && parsedPrefetchCount > 0)
                {
                    prefetchCount = parsedPrefetchCount;
                }
                else
                {
                    throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchPrefetchCount}: {prefetchCountStr}. Expected a positive integer.");
                }
            }

            // Parse memory buffer size
            int memoryBufferSizeMB = DefaultMemoryBufferSizeMB;
            if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchMemoryBufferSize, out string? memoryBufferSizeStr))
            {
                if (int.TryParse(memoryBufferSizeStr, out int parsedMemoryBufferSize) && parsedMemoryBufferSize > 0)
                {
                    memoryBufferSizeMB = parsedMemoryBufferSize;
                }
                else
                {
                    throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchMemoryBufferSize}: {memoryBufferSizeStr}. Expected a positive integer.");
                }
            }

            // Parse max retries
            int maxRetries = 3;
            if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? maxRetriesStr))
            {
                if (int.TryParse(maxRetriesStr, out int parsedMaxRetries) && parsedMaxRetries > 0)
                {
                    maxRetries = parsedMaxRetries;
                }
                else
                {
                    throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchMaxRetries}: {maxRetriesStr}. Expected a positive integer.");
                }
            }

            // Parse retry delay
            int retryDelayMs = 500;
            if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchRetryDelayMs, out string? retryDelayStr))
            {
                if (int.TryParse(retryDelayStr, out int parsedRetryDelay) && parsedRetryDelay > 0)
                {
                    retryDelayMs = parsedRetryDelay;
                }
                else
                {
                    throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchRetryDelayMs}: {retryDelayStr}. Expected a positive integer.");
                }
            }

            // Parse timeout minutes
            int timeoutMinutes = 5;
            if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchTimeoutMinutes, out string? timeoutStr))
            {
                if (int.TryParse(timeoutStr, out int parsedTimeout) && parsedTimeout > 0)
                {
                    timeoutMinutes = parsedTimeout;
                }
                else
                {
                    throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchTimeoutMinutes}: {timeoutStr}. Expected a positive integer.");
                }
            }

            // Initialize the memory manager
            _memoryManager = new CloudFetchMemoryBufferManager(memoryBufferSizeMB);

            // Initialize the queues with bounded capacity
            _downloadQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), prefetchCount * 2);
            _resultQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), prefetchCount * 2);

            // Initialize the HTTP client
            _httpClient = new HttpClient
            {
                Timeout = TimeSpan.FromMinutes(timeoutMinutes)
            };

            // Initialize the result fetcher
            _resultFetcher = new CloudFetchResultFetcher(
                _statement,
                _memoryManager,
                _downloadQueue,
                DefaultFetchBatchSize);

            // Initialize the downloader
            _downloader = new CloudFetchDownloader(
                _downloadQueue,
                _resultQueue,
                _memoryManager,
                _httpClient,
                parallelDownloads,
                _isLz4Compressed,
                maxRetries,
                retryDelayMs);
        }