in ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentExporter.java [68:116]
public RepeatStatus execute(@NonNull StepContribution contribution, @NonNull ChunkContext chunkContext) throws Exception {
StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();
ExecutionContext executionContext = stepExecution.getExecutionContext();
documentReader.open(executionContext);
DocumentItemWriter writer = null;
int writtenCount = 0;
try {
Document document;
while ((document = documentReader.read()) != null) {
if (writer != null && writtenCount >= writeBlockSize) {
stepExecution = jobContextRepository.getStepExecution(stepExecution.getJobExecutionId(), stepExecution.getId());
if (stepExecution.isTerminateOnly()) {
logger.info("Received stop signal.");
writer.revert();
writer = null;
return RepeatStatus.CONTINUABLE;
}
writer.close();
writer = null;
writtenCount = 0;
documentReader.update(executionContext);
jobContextRepository.updateExecutionContext(stepExecution);
}
if (writer == null)
writer = documentDestination.open(document);
writer.write(document);
++writtenCount;
}
}
catch (Exception e) {
if (writer != null) {
writer.revert();
writer = null;
}
throw e;
}
finally {
if (writer != null)
writer.close();
documentReader.close();
}
complete = true;
return RepeatStatus.FINISHED;
}