private async Task ImportInternalAsync()

in src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlResourceBulkImporter.cs [169:313]


        private async Task ImportInternalAsync(Channel<ImportResource> inputChannel, Channel<ImportProcessingProgress> outputChannel, IImportErrorStore importErrorStore, CancellationToken cancellationToken)
        {
            try
            {
                _logger.LogInformation("Start to import data to SQL data store.");

                Task<ImportProcessingProgress> checkpointTask = Task.FromResult<ImportProcessingProgress>(null);

                long succeedCount = 0;
                long failedCount = 0;
                long? lastCheckpointIndex = null;
                long currentIndex = -1;
                Dictionary<string, DataTable> resourceParamsBuffer = new Dictionary<string, DataTable>();
                List<string> importErrorBuffer = new List<string>();
                Queue<Task<ImportProcessingProgress>> importTasks = new Queue<Task<ImportProcessingProgress>>();

                List<ImportResource> resourceBuffer = new List<ImportResource>();
                await _sqlBulkCopyDataWrapperFactory.EnsureInitializedAsync();
                await foreach (ImportResource resource in inputChannel.Reader.ReadAllAsync(cancellationToken))
                {
                    if (cancellationToken.IsCancellationRequested)
                    {
                        throw new OperationCanceledException();
                    }

                    lastCheckpointIndex = lastCheckpointIndex ?? resource.Index - 1;
                    currentIndex = resource.Index;

                    resourceBuffer.Add(resource);
                    if (resourceBuffer.Count < _importTaskConfiguration.SqlBatchSizeForImportResourceOperation)
                    {
                        continue;
                    }

                    try
                    {
                        // Handle resources in buffer
                        IEnumerable<ImportResource> resourcesWithError = resourceBuffer.Where(r => r.ContainsError());
                        IEnumerable<SqlBulkCopyDataWrapper> inputResources = resourceBuffer.Where(r => !r.ContainsError()).Select(r => _sqlBulkCopyDataWrapperFactory.CreateSqlBulkCopyDataWrapper(r));
                        IEnumerable<SqlBulkCopyDataWrapper> mergedResources = await _sqlImportOperation.BulkMergeResourceAsync(inputResources, cancellationToken);
                        IEnumerable<SqlBulkCopyDataWrapper> duplicateResourcesNotMerged = inputResources.Except(mergedResources);

                        importErrorBuffer.AddRange(resourcesWithError.Select(r => r.ImportError));
                        FillResourceParamsBuffer(mergedResources, resourceParamsBuffer);
                        AppendDuplicatedResouceErrorToBuffer(duplicateResourcesNotMerged, importErrorBuffer);

                        succeedCount += mergedResources.Count();
                        failedCount += resourcesWithError.Count() + duplicateResourcesNotMerged.Count();
                    }
                    finally
                    {
                        foreach (ImportResource importResource in resourceBuffer)
                        {
                            importResource?.CompressedStream?.Dispose();
                        }

                        resourceBuffer.Clear();
                    }

                    bool shouldCreateCheckpoint = resource.Index - lastCheckpointIndex >= _importTaskConfiguration.SqlImportBatchSizeForCheckpoint;
                    if (shouldCreateCheckpoint)
                    {
                        // Create checkpoint for all tables not empty
                        string[] tableNameNeedImport = resourceParamsBuffer.Where(r => r.Value.Rows.Count > 0).Select(r => r.Key).ToArray();

                        foreach (string tableName in tableNameNeedImport)
                        {
                            DataTable dataTable = resourceParamsBuffer[tableName];
                            resourceParamsBuffer.Remove(tableName);
                            await EnqueueTaskAsync(importTasks, () => ImportDataTableAsync(dataTable, cancellationToken), outputChannel);
                        }

                        // wait previous checkpoint task complete
                        await checkpointTask;

                        // upload error logs for import errors
                        string[] importErrors = importErrorBuffer.ToArray();
                        importErrorBuffer.Clear();
                        lastCheckpointIndex = resource.Index;
                        checkpointTask = await EnqueueTaskAsync(importTasks, () => UploadImportErrorsAsync(importErrorStore, succeedCount, failedCount, importErrors, currentIndex, cancellationToken), outputChannel);
                    }
                    else
                    {
                        // import table >= MaxResourceCountInBatch
                        string[] tableNameNeedImport =
                                    resourceParamsBuffer.Where(r => r.Value.Rows.Count >= _importTaskConfiguration.SqlBatchSizeForImportParamsOperation).Select(r => r.Key).ToArray();

                        foreach (string tableName in tableNameNeedImport)
                        {
                            DataTable dataTable = resourceParamsBuffer[tableName];
                            resourceParamsBuffer.Remove(tableName);
                            await EnqueueTaskAsync(importTasks, () => ImportDataTableAsync(dataTable, cancellationToken), outputChannel);
                        }
                    }
                }

                try
                {
                    // Handle resources in buffer
                    IEnumerable<ImportResource> resourcesWithError = resourceBuffer.Where(r => r.ContainsError());
                    IEnumerable<SqlBulkCopyDataWrapper> inputResources = resourceBuffer.Where(r => !r.ContainsError()).Select(r => _sqlBulkCopyDataWrapperFactory.CreateSqlBulkCopyDataWrapper(r));
                    IEnumerable<SqlBulkCopyDataWrapper> mergedResources = await _sqlImportOperation.BulkMergeResourceAsync(inputResources, cancellationToken);
                    IEnumerable<SqlBulkCopyDataWrapper> duplicateResourcesNotMerged = inputResources.Except(mergedResources);
                    importErrorBuffer.AddRange(resourcesWithError.Select(r => r.ImportError));

                    FillResourceParamsBuffer(mergedResources, resourceParamsBuffer);

                    AppendDuplicatedResouceErrorToBuffer(duplicateResourcesNotMerged, importErrorBuffer);
                    succeedCount += mergedResources.Count();
                    failedCount += resourcesWithError.Count() + duplicateResourcesNotMerged.Count();
                }
                finally
                {
                    foreach (ImportResource importResource in resourceBuffer)
                    {
                        importResource?.CompressedStream?.Dispose();
                    }

                    resourceBuffer.Clear();
                }

                // Import all remain tables
                string[] allTablesNotNull = resourceParamsBuffer.Where(r => r.Value.Rows.Count > 0).Select(r => r.Key).ToArray();
                foreach (string tableName in allTablesNotNull)
                {
                    DataTable dataTable = resourceParamsBuffer[tableName];
                    await EnqueueTaskAsync(importTasks, () => ImportDataTableAsync(dataTable, cancellationToken), outputChannel);
                }

                // Wait all table import task complete
                while (importTasks.Count > 0)
                {
                    await importTasks.Dequeue();
                }

                // Upload remain error logs
                ImportProcessingProgress progress = await UploadImportErrorsAsync(importErrorStore, succeedCount, failedCount, importErrorBuffer.ToArray(), currentIndex, cancellationToken);
                await outputChannel.Writer.WriteAsync(progress, cancellationToken);
            }
            finally
            {
                outputChannel.Writer.Complete();
                _logger.LogInformation("Import data to SQL data store complete.");
            }
        }