public int run()

in src/main/java/org/apache/accumulo/testing/continuous/BulkIngest.java [53:109]


  public int run(String[] args) throws Exception {
    String ingestInstanceId = UUID.randomUUID().toString();
    String bulkDir = args[0];

    Job job = Job.getInstance(getConf());
    job.setJobName("BulkIngest_" + ingestInstanceId);
    job.setJarByClass(BulkIngest.class);
    // very important to prevent guava conflicts
    job.getConfiguration().set("mapreduce.job.classloader", "true");
    FileSystem fs = FileSystem.get(URI.create(bulkDir), job.getConfiguration());

    log.info(String.format("UUID %d %s", System.currentTimeMillis(), ingestInstanceId));

    job.setInputFormatClass(ContinuousInputFormat.class);

    // map the generated random longs to key values
    job.setMapOutputKeyClass(Key.class);
    job.setMapOutputValueClass(Value.class);

    // remove bulk dir from args
    args = Arrays.asList(args).subList(1, 3).toArray(new String[2]);

    try (ContinuousEnv env = new ContinuousEnv(args)) {
      fs.mkdirs(fs.makeQualified(new Path(bulkDir)));

      // output RFiles for the import
      job.setOutputFormatClass(AccumuloFileOutputFormat.class);
      AccumuloFileOutputFormat.configure()
          .outputPath(fs.makeQualified(new Path(bulkDir + "/files"))).store(job);

      ContinuousInputFormat.configure(job.getConfiguration(), ingestInstanceId, env);

      String tableName = env.getAccumuloTableName();

      // create splits file for KeyRangePartitioner
      String splitsFile = bulkDir + "/splits.txt";
      AccumuloClient client = env.getAccumuloClient();

      // make sure splits file is closed before continuing
      try (PrintStream out = new PrintStream(
          new BufferedOutputStream(fs.create(fs.makeQualified(new Path(splitsFile)))))) {
        Collection<Text> splits = client.tableOperations().listSplits(tableName,
            env.getBulkReducers() - 1);
        splits.stream().map(Text::copyBytes).map(split -> Base64.getEncoder().encodeToString(split))
            .forEach(out::println);
        job.setNumReduceTasks(splits.size() + 1);
      }

      job.setPartitionerClass(KeyRangePartitioner.class);
      KeyRangePartitioner.setSplitFile(job, fs.makeQualified(new Path(splitsFile)).toString());

      job.waitForCompletion(true);
      boolean success = job.isSuccessful();

      return success ? 0 : 1;
    }
  }