private async Task ProcessInputQueue()

in Job.Import/Import.cs [165:354]


        private async Task ProcessInputQueue()
        {
            using (_httpClientHelper = new HttpClientHelper(_settings))
            {
                var fileCount = 0;
                string fileNameInPackageTemplate = "";
                FileStream zipToOpen = null;
                ZipArchive archive = null;

                if (_settings.InputFilesArePackages == false)
                {
                    fileNameInPackageTemplate = GetFileNameInPackageTemplate();
                    if (string.IsNullOrEmpty(fileNameInPackageTemplate))
                    {
                        throw new JobExecutionException(string.Format(Resources.Job_0_Please_check_your_package_template_Input_file_name_in_Manifest_cannot_be_identified, _context.JobDetail.Key));
                    }
                }

                while (InputQueue.TryDequeue(out DataMessage dataMessage))
                {
                    try
                    {
                        if (fileCount > 0 && _settings.DelayBetweenFiles > 0) //Only delay after first file and never after last.
                        {
                            System.Threading.Thread.Sleep(TimeSpan.FromSeconds(_settings.DelayBetweenFiles));
                        }
                        fileCount++;

                        var sourceStream = _retryPolicyForIo.Execute(() => FileOperationsHelper.Read(dataMessage.FullPath));
                        if (sourceStream == null) continue;//Nothing to do here

                        string tempFileName = "";

                        //If we need to "wrap" file in package envelope
                        if (_settings.InputFilesArePackages == false)
                        {
                            using (zipToOpen = new FileStream(_settings.PackageTemplate, FileMode.Open))
                            {
                                tempFileName = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString());
                                _retryPolicyForIo.Execute(() => FileOperationsHelper.Create(zipToOpen, tempFileName));
                                var tempZipStream = _retryPolicyForIo.Execute(() => FileOperationsHelper.Read(tempFileName));
                                using (archive = new ZipArchive(tempZipStream, ZipArchiveMode.Update))
                                {
                                    //Check if package template contains input file and remove it first. It should not be there in the first place.
                                    ZipArchiveEntry entry = archive.GetEntry(fileNameInPackageTemplate);
                                    if (entry != null)
                                    {
                                        entry.Delete();
                                        Log.WarnFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Package_template_contains_input_file_1_Please_remove_it_from_the_template, _context.JobDetail.Key, fileNameInPackageTemplate));
                                    }

                                    // Update Manifest file with the original file name for end-to-end traceability. Use the new file name in the rest of the method.
                                    fileNameInPackageTemplate = UpdateManifestFile(archive, dataMessage, fileNameInPackageTemplate);

                                    var importedFile = archive.CreateEntry(fileNameInPackageTemplate, CompressionLevel.Fastest);
                                    using var entryStream = importedFile.Open();
                                    sourceStream.CopyTo(entryStream);
                                    sourceStream.Close();
                                    sourceStream.Dispose();
                                }
                                sourceStream = _retryPolicyForIo.Execute(() => FileOperationsHelper.Read(tempFileName));
                            }
                        }
                        if (Log.IsDebugEnabled)
                            Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Uploading_file_1_File_size_2_bytes, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}"), sourceStream.Length));

                        // Get blob url and id. Returns in json format
                        var response = await _httpClientHelper.GetAzureWriteUrl();
                        if(!response.IsSuccessStatusCode)
                        {
                            throw new JobExecutionException($"Job: {_settings.JobKey}. Request GetAzureWriteUrl failed.");
                        }
                        var blobInfo = (JObject)JsonConvert.DeserializeObject(HttpClientHelper.ReadResponseString(response));
                        var blobUrl = blobInfo["BlobUrl"].ToString();

                        var blobUri = new Uri(blobUrl);

                        //Upload package to blob storage
                        var uploadResponse = await _httpClientHelper.UploadContentsToBlob(blobUri, sourceStream);
                        
                        if (sourceStream != null)
                        {
                            sourceStream.Close();
                            sourceStream.Dispose();
                            if (!_settings.InputFilesArePackages)//if we wraped file in package envelop we need to delete temp file
                            {
                                _retryPolicyForIo.Execute(() => FileOperationsHelper.Delete(tempFileName));
                            }
                        }
                        if (uploadResponse.IsSuccessStatusCode)
                        {
                            //Now send import request
                            var targetLegalEntity = _settings.Company;
                            if (_settings.MultiCompanyImport && _settings.GetLegalEntityFromSubfolder)
                            {
                                targetLegalEntity = new FileInfo(dataMessage.FullPath).Directory.Name;
                            }
                            if (_settings.MultiCompanyImport && _settings.GetLegalEntityFromFilename)
                            {
                                String[] separator = { _settings.FilenameSeparator };
                                var tokenList = dataMessage.Name.Split(separator, 10, StringSplitOptions.RemoveEmptyEntries);

                                targetLegalEntity = tokenList[_settings.LegalEntityTokenPosition - 1];
                            }
                            if(targetLegalEntity.Length > 4)
                            {
                                Log.ErrorFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Target_legal_entity_is_not_valid_1, _context.JobDetail.Key, targetLegalEntity));
                            }

                            if(string.IsNullOrEmpty(targetLegalEntity))
                            {
                                throw new Exception(string.Format(Resources.Job_0_Unable_to_get_target_legal_entity_name, _context.JobDetail.Key));
                            }
                            var executionIdGenerated = CreateExecutionId(_settings.DataProject);
                            var importResponse = await _httpClientHelper.ImportFromPackage(blobUri.AbsoluteUri, _settings.DataProject, executionIdGenerated, _settings.ExecuteImport, _settings.OverwriteDataProject, targetLegalEntity);

                            if (importResponse.IsSuccessStatusCode)
                            {
                                var result = importResponse.Content.ReadAsStringAsync().Result;
                                var jsonResponse = (JObject)JsonConvert.DeserializeObject(result);
                                string executionId = jsonResponse["value"].ToString();

                                var targetDataMessage = new DataMessage(dataMessage)
                                {
                                    MessageId = executionId,
                                    FullPath = dataMessage.FullPath.Replace(_settings.InputDir, _settings.UploadSuccessDir),
                                    MessageStatus = MessageStatus.Enqueued
                                };

                                // Move to inprocess/success location
                                _retryPolicyForIo.Execute(() => FileOperationsHelper.Move(dataMessage.FullPath, targetDataMessage.FullPath));

                                if (_settings.ExecutionJobPresent)
                                    _retryPolicyForIo.Execute(() => FileOperationsHelper.WriteStatusFile(targetDataMessage, _settings.StatusFileExtension));
                                if (Log.IsDebugEnabled)
                                    Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_File_1_uploaded_successfully, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}")));
                            }
                            else
                            {
                                // import request failed. Move message to error location.
                                Log.ErrorFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Upload_failed_for_file_1_Failure_response_Status_2_Reason_3, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}"), importResponse.StatusCode, importResponse.ReasonPhrase, $"{Environment.NewLine}packageUrl: {blobUri.AbsoluteUri}{Environment.NewLine}definitionGroupId: {_settings.DataProject}{Environment.NewLine}executionId: {executionIdGenerated}{Environment.NewLine}execute: {_settings.ExecuteImport}{Environment.NewLine}overwrite: {_settings.OverwriteDataProject}{Environment.NewLine}legalEntityId: {targetLegalEntity}"));

                                var targetDataMessage = new DataMessage(dataMessage)
                                {
                                    FullPath = dataMessage.FullPath.Replace(_settings.InputDir, _settings.UploadErrorsDir),
                                    MessageStatus = MessageStatus.Failed
                                };

                                // Move data to error location
                                _retryPolicyForIo.Execute(() => FileOperationsHelper.Move(dataMessage.FullPath, targetDataMessage.FullPath));

                                // Save the log with import failure details
                                _retryPolicyForIo.Execute(() => FileOperationsHelper.WriteStatusLogFile(targetDataMessage, importResponse, _settings.StatusFileExtension));
                            }
                        }
                        else
                        {
                            // upload failed. Move message to error location.
                            Log.ErrorFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Upload_failed_for_file_1_Failure_response_Status_2_Reason_3, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}"), uploadResponse.StatusCode, uploadResponse.ReasonPhrase));

                            var targetDataMessage = new DataMessage(dataMessage)
                            {
                                FullPath = dataMessage.FullPath.Replace(_settings.InputDir, _settings.UploadErrorsDir),
                                MessageStatus = MessageStatus.Failed
                            };

                            // Move data to error location
                            _retryPolicyForIo.Execute(() => FileOperationsHelper.Move(dataMessage.FullPath, targetDataMessage.FullPath));

                            // Save the log with import failure details
                            _retryPolicyForIo.Execute(() => FileOperationsHelper.WriteStatusLogFile(targetDataMessage, uploadResponse, _settings.StatusFileExtension));
                        }
                    }
                    catch (Exception ex)
                    {
                        Log.ErrorFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Failure_processing_file_1_Exception_2, _context.JobDetail.Key, dataMessage.FullPath.Replace(@"{", @"{{").Replace(@"}", @"}}"), ex.Message), ex);
                        throw;
                    }
                    finally
                    {
                        if (zipToOpen != null)
                        {
                            zipToOpen.Close();
                            zipToOpen.Dispose();
                        }
                        archive?.Dispose();
                    }
                }
            }
        }