public DocumentExporter documentExporter()

in ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java [85:126]


  public DocumentExporter documentExporter(DocumentItemReader documentItemReader,
                                           @Value("#{stepExecution.jobExecution.jobId}") String jobId,
                                           @Value("#{stepExecution.jobExecution.executionContext.get('" + PARAMETERS_CONTEXT_KEY + "')}") ArchivingProperties parameters,
                                           InfraManagerDataConfig infraManagerDataConfig,
                                           @Value("#{jobParameters[end]}") String intervalEnd,
                                           DocumentWiper documentWiper,
                                           JobContextRepository jobContextRepository) {

    File baseDir = new File(infraManagerDataConfig.getDataFolder(), "exporting");
    CompositeFileAction fileAction = new CompositeFileAction(new BZip2Compressor());
    switch (parameters.getDestination()) {
      case HDFS:
        org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
        fileAction.add(new HdfsUploader(conf,
                parameters.hdfsProperties().orElseThrow(() -> new IllegalStateException("HDFS properties are not provided!"))));
        break;
      case LOCAL:
        baseDir = new File(parameters.getLocalDestinationDirectory());
        break;
    }

    FileNameSuffixFormatter fileNameSuffixFormatter = FileNameSuffixFormatter.from(parameters);
    LocalItemWriterListener itemWriterListener = new LocalItemWriterListener(fileAction, documentWiper);
    File destinationDirectory = new File(
            baseDir,
            String.format("%s_%s_%s",
                    parameters.getSolr().getCollection(),
                    jobId,
                    isBlank(intervalEnd) ? "" : fileNameSuffixFormatter.format(intervalEnd)));
    logger.info("Destination directory path={}", destinationDirectory);
    if (!destinationDirectory.exists()) {
      if (!destinationDirectory.mkdirs()) {
        logger.warn("Unable to create directory {}", destinationDirectory);
      }
    }

    return new DocumentExporter(
            documentItemReader,
            firstDocument -> new LocalDocumentItemWriter(
                    outFile(parameters.getSolr().getCollection(), destinationDirectory, fileNameSuffixFormatter.format(firstDocument)), itemWriterListener),
            parameters.getWriteBlockSize(), jobContextRepository);
  }