in Job.ExecutionMonitor/ExecutionMonitor.cs [197:316]
private async Task PostProcessMessage(string executionStatus, DataMessage dataMessage)
{
if (_settings.LogVerbose || Log.IsDebugEnabled)
{
Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_ExecutionId_1_status_check_returned_2, _context.JobDetail.Key, dataMessage.MessageId, executionStatus));
}
switch (executionStatus)
{
case "Succeeded":
{
// Move message file and delete processing status file
var processingSuccessDestination = dataMessage.FullPath.Replace(_settings.UploadSuccessDir, _settings.ProcessingSuccessDir);
_retryPolicyForIo.Execute(() => FileOperationsHelper.MoveDataToTarget(dataMessage.FullPath, processingSuccessDestination, true, _settings.StatusFileExtension));
await CreateLinkToExecutionSummaryPage(dataMessage.MessageId, processingSuccessDestination);
}
break;
case "Unknown":
case "PartiallySucceeded":
case "Failed":
case "Canceled":
{
var processingErrorDestination = dataMessage.FullPath.Replace(_settings.UploadSuccessDir, _settings.ProcessingErrorsDir);
_retryPolicyForIo.Execute(() => FileOperationsHelper.MoveDataToTarget(dataMessage.FullPath, processingErrorDestination, true, _settings.StatusFileExtension));
await CreateLinkToExecutionSummaryPage(dataMessage.MessageId, processingErrorDestination);
if (_settings.GetImportTargetErrorKeysFile)
{
if (_settings.LogVerbose || Log.IsDebugEnabled)
{
Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Checking_if_error_keys_file_was_generated, _context.JobDetail.Key));
}
var fileWithManifest = processingErrorDestination;
if(!string.IsNullOrEmpty(_settings.PackageTemplate))
{
fileWithManifest = _settings.PackageTemplate;
}
var entitiesInPackage = GetEntitiesNamesInPackage(fileWithManifest);
foreach(var entity in entitiesInPackage)
{
if (_settings.LogVerbose || Log.IsDebugEnabled)
{
Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Checking_for_error_keys_for_data_entity_1, _context.JobDetail.Key, entity));
}
var errorsExistResponse = await _httpClientHelper.GenerateImportTargetErrorKeysFile(dataMessage.MessageId, entity);
if (errorsExistResponse.IsSuccessStatusCode && Convert.ToBoolean(HttpClientHelper.ReadResponseString(errorsExistResponse)))
{
var errorFileUrl = string.Empty;
HttpResponseMessage errorFileUrlResponse;
do
{
errorFileUrlResponse = await _httpClientHelper.GetImportTargetErrorKeysFileUrl(dataMessage.MessageId, entity);
if(errorFileUrlResponse.IsSuccessStatusCode)
{
errorFileUrl = HttpClientHelper.ReadResponseString(errorFileUrlResponse);
if (errorFileUrl.Length == 0)
{
System.Threading.Thread.Sleep(TimeSpan.FromSeconds(_settings.DelayBetweenStatusCheck));
}
}
}
while (errorFileUrlResponse.IsSuccessStatusCode && string.IsNullOrEmpty(errorFileUrl));
var response = await _httpClientHelper.GetRequestAsync(new UriBuilder(errorFileUrl).Uri, false);
if (response.IsSuccessStatusCode)
{
using Stream downloadedStream = await response.Content.ReadAsStreamAsync();
var errorsFileName = $"{Path.GetFileNameWithoutExtension(dataMessage.Name)}-{entity}-ErrorKeys-{DateTime.Now:yyyy-MM-dd_HH-mm-ss-ffff}.txt";
var errorsFilePath = Path.Combine(Path.GetDirectoryName(dataMessage.FullPath.Replace(_settings.UploadSuccessDir, _settings.ProcessingErrorsDir)), errorsFileName);
var dataMessageForErrorsFile = new DataMessage()
{
FullPath = errorsFilePath,
Name = errorsFileName,
MessageStatus = MessageStatus.Failed
};
_retryPolicyForIo.Execute(() => FileOperationsHelper.Create(downloadedStream, dataMessageForErrorsFile.FullPath));
}
else
{
Log.Warn($@"Job: {_settings.JobKey}. Download of error keys file failed. Job will continue processing other packages.
Uploaded file: {dataMessage.FullPath}
Execution status: {executionStatus}
Error file URL: {errorFileUrl}");
}
}
}
}
if (_settings.GetExecutionErrors)
{
if (_settings.LogVerbose || Log.IsDebugEnabled)
{
Log.DebugFormat(CultureInfo.InvariantCulture, string.Format(Resources.Job_0_Trying_to_download_execution_errors, _context.JobDetail.Key));
}
var response = await _httpClientHelper.GetExecutionErrors(dataMessage.MessageId);
if (response.IsSuccessStatusCode)
{
using Stream downloadedStream = await response.Content.ReadAsStreamAsync();
var errorsFileName = $"{Path.GetFileNameWithoutExtension(dataMessage.Name)}-ExecutionErrors-{DateTime.Now:yyyy-MM-dd_HH-mm-ss-ffff}.txt";
var errorsFilePath = Path.Combine(Path.GetDirectoryName(dataMessage.FullPath.Replace(_settings.UploadSuccessDir, _settings.ProcessingErrorsDir)), errorsFileName);
var dataMessageForErrorsFile = new DataMessage()
{
FullPath = errorsFilePath,
Name = errorsFileName,
MessageStatus = MessageStatus.Failed
};
_retryPolicyForIo.Execute(() => FileOperationsHelper.Create(downloadedStream, dataMessageForErrorsFile.FullPath));
}
else
{
Log.Warn($@"Job: {_settings.JobKey}. Download of execution error details failed. Job will continue processing other packages.
Uploaded file: {dataMessage.FullPath}
Execution status: {executionStatus}
Message Id: {dataMessage.MessageId}");
}
}
}
break;
default: //"NotRun", "Executing"
break;
}
}