in src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlResourceBulkImporter.cs [169:313]
private async Task ImportInternalAsync(Channel<ImportResource> inputChannel, Channel<ImportProcessingProgress> outputChannel, IImportErrorStore importErrorStore, CancellationToken cancellationToken)
{
try
{
_logger.LogInformation("Start to import data to SQL data store.");
Task<ImportProcessingProgress> checkpointTask = Task.FromResult<ImportProcessingProgress>(null);
long succeedCount = 0;
long failedCount = 0;
long? lastCheckpointIndex = null;
long currentIndex = -1;
Dictionary<string, DataTable> resourceParamsBuffer = new Dictionary<string, DataTable>();
List<string> importErrorBuffer = new List<string>();
Queue<Task<ImportProcessingProgress>> importTasks = new Queue<Task<ImportProcessingProgress>>();
List<ImportResource> resourceBuffer = new List<ImportResource>();
await _sqlBulkCopyDataWrapperFactory.EnsureInitializedAsync();
await foreach (ImportResource resource in inputChannel.Reader.ReadAllAsync(cancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException();
}
lastCheckpointIndex = lastCheckpointIndex ?? resource.Index - 1;
currentIndex = resource.Index;
resourceBuffer.Add(resource);
if (resourceBuffer.Count < _importTaskConfiguration.SqlBatchSizeForImportResourceOperation)
{
continue;
}
try
{
// Handle resources in buffer
IEnumerable<ImportResource> resourcesWithError = resourceBuffer.Where(r => r.ContainsError());
IEnumerable<SqlBulkCopyDataWrapper> inputResources = resourceBuffer.Where(r => !r.ContainsError()).Select(r => _sqlBulkCopyDataWrapperFactory.CreateSqlBulkCopyDataWrapper(r));
IEnumerable<SqlBulkCopyDataWrapper> mergedResources = await _sqlImportOperation.BulkMergeResourceAsync(inputResources, cancellationToken);
IEnumerable<SqlBulkCopyDataWrapper> duplicateResourcesNotMerged = inputResources.Except(mergedResources);
importErrorBuffer.AddRange(resourcesWithError.Select(r => r.ImportError));
FillResourceParamsBuffer(mergedResources, resourceParamsBuffer);
AppendDuplicatedResouceErrorToBuffer(duplicateResourcesNotMerged, importErrorBuffer);
succeedCount += mergedResources.Count();
failedCount += resourcesWithError.Count() + duplicateResourcesNotMerged.Count();
}
finally
{
foreach (ImportResource importResource in resourceBuffer)
{
importResource?.CompressedStream?.Dispose();
}
resourceBuffer.Clear();
}
bool shouldCreateCheckpoint = resource.Index - lastCheckpointIndex >= _importTaskConfiguration.SqlImportBatchSizeForCheckpoint;
if (shouldCreateCheckpoint)
{
// Create checkpoint for all tables not empty
string[] tableNameNeedImport = resourceParamsBuffer.Where(r => r.Value.Rows.Count > 0).Select(r => r.Key).ToArray();
foreach (string tableName in tableNameNeedImport)
{
DataTable dataTable = resourceParamsBuffer[tableName];
resourceParamsBuffer.Remove(tableName);
await EnqueueTaskAsync(importTasks, () => ImportDataTableAsync(dataTable, cancellationToken), outputChannel);
}
// wait previous checkpoint task complete
await checkpointTask;
// upload error logs for import errors
string[] importErrors = importErrorBuffer.ToArray();
importErrorBuffer.Clear();
lastCheckpointIndex = resource.Index;
checkpointTask = await EnqueueTaskAsync(importTasks, () => UploadImportErrorsAsync(importErrorStore, succeedCount, failedCount, importErrors, currentIndex, cancellationToken), outputChannel);
}
else
{
// import table >= MaxResourceCountInBatch
string[] tableNameNeedImport =
resourceParamsBuffer.Where(r => r.Value.Rows.Count >= _importTaskConfiguration.SqlBatchSizeForImportParamsOperation).Select(r => r.Key).ToArray();
foreach (string tableName in tableNameNeedImport)
{
DataTable dataTable = resourceParamsBuffer[tableName];
resourceParamsBuffer.Remove(tableName);
await EnqueueTaskAsync(importTasks, () => ImportDataTableAsync(dataTable, cancellationToken), outputChannel);
}
}
}
try
{
// Handle resources in buffer
IEnumerable<ImportResource> resourcesWithError = resourceBuffer.Where(r => r.ContainsError());
IEnumerable<SqlBulkCopyDataWrapper> inputResources = resourceBuffer.Where(r => !r.ContainsError()).Select(r => _sqlBulkCopyDataWrapperFactory.CreateSqlBulkCopyDataWrapper(r));
IEnumerable<SqlBulkCopyDataWrapper> mergedResources = await _sqlImportOperation.BulkMergeResourceAsync(inputResources, cancellationToken);
IEnumerable<SqlBulkCopyDataWrapper> duplicateResourcesNotMerged = inputResources.Except(mergedResources);
importErrorBuffer.AddRange(resourcesWithError.Select(r => r.ImportError));
FillResourceParamsBuffer(mergedResources, resourceParamsBuffer);
AppendDuplicatedResouceErrorToBuffer(duplicateResourcesNotMerged, importErrorBuffer);
succeedCount += mergedResources.Count();
failedCount += resourcesWithError.Count() + duplicateResourcesNotMerged.Count();
}
finally
{
foreach (ImportResource importResource in resourceBuffer)
{
importResource?.CompressedStream?.Dispose();
}
resourceBuffer.Clear();
}
// Import all remain tables
string[] allTablesNotNull = resourceParamsBuffer.Where(r => r.Value.Rows.Count > 0).Select(r => r.Key).ToArray();
foreach (string tableName in allTablesNotNull)
{
DataTable dataTable = resourceParamsBuffer[tableName];
await EnqueueTaskAsync(importTasks, () => ImportDataTableAsync(dataTable, cancellationToken), outputChannel);
}
// Wait all table import task complete
while (importTasks.Count > 0)
{
await importTasks.Dequeue();
}
// Upload remain error logs
ImportProcessingProgress progress = await UploadImportErrorsAsync(importErrorStore, succeedCount, failedCount, importErrorBuffer.ToArray(), currentIndex, cancellationToken);
await outputChannel.Writer.WriteAsync(progress, cancellationToken);
}
finally
{
outputChannel.Writer.Complete();
_logger.LogInformation("Import data to SQL data store complete.");
}
}