in csharp/src/Drivers/Databricks/CloudFetch/CloudFetchDownloadManager.cs [60:183]
public CloudFetchDownloadManager(DatabricksStatement statement, Schema schema, bool isLz4Compressed)
{
_statement = statement ?? throw new ArgumentNullException(nameof(statement));
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
_isLz4Compressed = isLz4Compressed;
// Get configuration values from connection properties
var connectionProps = statement.Connection.Properties;
// Parse parallel downloads
int parallelDownloads = DefaultParallelDownloads;
if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchParallelDownloads, out string? parallelDownloadsStr))
{
if (int.TryParse(parallelDownloadsStr, out int parsedParallelDownloads) && parsedParallelDownloads > 0)
{
parallelDownloads = parsedParallelDownloads;
}
else
{
throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchParallelDownloads}: {parallelDownloadsStr}. Expected a positive integer.");
}
}
// Parse prefetch count
int prefetchCount = DefaultPrefetchCount;
if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchPrefetchCount, out string? prefetchCountStr))
{
if (int.TryParse(prefetchCountStr, out int parsedPrefetchCount) && parsedPrefetchCount > 0)
{
prefetchCount = parsedPrefetchCount;
}
else
{
throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchPrefetchCount}: {prefetchCountStr}. Expected a positive integer.");
}
}
// Parse memory buffer size
int memoryBufferSizeMB = DefaultMemoryBufferSizeMB;
if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchMemoryBufferSize, out string? memoryBufferSizeStr))
{
if (int.TryParse(memoryBufferSizeStr, out int parsedMemoryBufferSize) && parsedMemoryBufferSize > 0)
{
memoryBufferSizeMB = parsedMemoryBufferSize;
}
else
{
throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchMemoryBufferSize}: {memoryBufferSizeStr}. Expected a positive integer.");
}
}
// Parse max retries
int maxRetries = 3;
if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchMaxRetries, out string? maxRetriesStr))
{
if (int.TryParse(maxRetriesStr, out int parsedMaxRetries) && parsedMaxRetries > 0)
{
maxRetries = parsedMaxRetries;
}
else
{
throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchMaxRetries}: {maxRetriesStr}. Expected a positive integer.");
}
}
// Parse retry delay
int retryDelayMs = 500;
if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchRetryDelayMs, out string? retryDelayStr))
{
if (int.TryParse(retryDelayStr, out int parsedRetryDelay) && parsedRetryDelay > 0)
{
retryDelayMs = parsedRetryDelay;
}
else
{
throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchRetryDelayMs}: {retryDelayStr}. Expected a positive integer.");
}
}
// Parse timeout minutes
int timeoutMinutes = 5;
if (connectionProps.TryGetValue(DatabricksParameters.CloudFetchTimeoutMinutes, out string? timeoutStr))
{
if (int.TryParse(timeoutStr, out int parsedTimeout) && parsedTimeout > 0)
{
timeoutMinutes = parsedTimeout;
}
else
{
throw new ArgumentException($"Invalid value for {DatabricksParameters.CloudFetchTimeoutMinutes}: {timeoutStr}. Expected a positive integer.");
}
}
// Initialize the memory manager
_memoryManager = new CloudFetchMemoryBufferManager(memoryBufferSizeMB);
// Initialize the queues with bounded capacity
_downloadQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), prefetchCount * 2);
_resultQueue = new BlockingCollection<IDownloadResult>(new ConcurrentQueue<IDownloadResult>(), prefetchCount * 2);
// Initialize the HTTP client
_httpClient = new HttpClient
{
Timeout = TimeSpan.FromMinutes(timeoutMinutes)
};
// Initialize the result fetcher
_resultFetcher = new CloudFetchResultFetcher(
_statement,
_memoryManager,
_downloadQueue,
DefaultFetchBatchSize);
// Initialize the downloader
_downloader = new CloudFetchDownloader(
_downloadQueue,
_resultQueue,
_memoryManager,
_httpClient,
parallelDownloads,
_isLz4Compressed,
maxRetries,
retryDelayMs);
}