in ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java [85:126]
public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
@Value("#{stepExecution.jobExecution.jobId}") String jobId,
@Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingProperties parameters,
InfraManagerDataConfig infraManagerDataConfig,
@Value("#{jobParameters[end]}") String intervalEnd,
DocumentWiper documentWiper,
JobContextRepository jobContextRepository) {
File baseDir = new File(infraManagerDataConfig.getDataFolder(), "exporting");
CompositeFileAction fileAction = new CompositeFileAction(new BZip2Compressor());
switch (parameters.getDestination()) {
case HDFS:
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
fileAction.add(new HdfsUploader(conf,
parameters.hdfsProperties().orElseThrow(() -> new IllegalStateException("HDFS properties are not provided!"))));
break;
case LOCAL:
baseDir = new File(parameters.getLocalDestinationDirectory());
break;
}
FileNameSuffixFormatter fileNameSuffixFormatter = FileNameSuffixFormatter.from(parameters);
LocalItemWriterListener itemWriterListener = new LocalItemWriterListener(fileAction, documentWiper);
File destinationDirectory = new File(
baseDir,
String.format("%s_%s_%s",
parameters.getSolr().getCollection(),
jobId,
isBlank(intervalEnd) ? "" : fileNameSuffixFormatter.format(intervalEnd)));
logger.info("Destination directory path={}", destinationDirectory);
if (!destinationDirectory.exists()) {
if (!destinationDirectory.mkdirs()) {
logger.warn("Unable to create directory {}", destinationDirectory);
}
}
return new DocumentExporter(
documentItemReader,
firstDocument -> new LocalDocumentItemWriter(
outFile(parameters.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener),
parameters.getWriteBlockSize(), jobContextRepository);
}