private int buildTree()

in stresso/src/main/java/stresso/trie/Init.java [178:235]


  private int buildTree(FluoConfiguration props, Path tmp) throws Exception {
    final StressoConfig sconf = StressoConfig.retrieve(props);

    Job job = Job.getInstance(getConf());

    job.setJarByClass(Init.class);

    job.setJobName(Init.class.getName() + "_load");

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);

    job.getConfiguration().setInt(TRIE_NODE_SIZE_PROP, sconf.nodeSize);
    job.getConfiguration().setInt(TRIE_STOP_LEVEL_PROP, sconf.stopLevel);

    job.setInputFormatClass(SequenceFileInputFormat.class);
    SequenceFileInputFormat.addInputPath(job, new Path(tmp, "nums"));

    job.setMapperClass(InitMapper.class);
    job.setCombinerClass(InitCombiner.class);
    job.setReducerClass(InitReducer.class);

    job.setOutputFormatClass(AccumuloFileOutputFormat.class);

    job.setPartitionerClass(RangePartitioner.class);

    FileSystem fs = FileSystem.get(job.getConfiguration());

    Path splitsPath = new Path(tmp, "splits.txt");

    Collection<Text> splits1 = writeSplits(props, fs, splitsPath);

    RangePartitioner.setSplitFile(job, splitsPath.toString());
    job.setNumReduceTasks(splits1.size() + 1);

    Path outPath = new Path(tmp, "out");
    AccumuloFileOutputFormat.setOutputPath(job, outPath);

    job.getConfiguration().set("mapreduce.job.classloader", "true");

    boolean success = job.waitForCompletion(true);

    if (success) {
      Path failPath = new Path(tmp, "failures");
      fs.mkdirs(failPath);

      AccumuloUtil.doTableOp(props, (tableOps, table) -> {
        tableOps.importDirectory(table, outPath.toString(), failPath.toString(), false);

        // Compacting files makes them local to each tablet and generates files using the tables
        // settings.
        tableOps.compact(table, new CompactionConfig().setWait(true));
      });


    }
    return success ? 0 : 1;
  }