in stresso/src/main/java/stresso/trie/Init.java [178:235]
private int buildTree(FluoConfiguration props, Path tmp) throws Exception {
final StressoConfig sconf = StressoConfig.retrieve(props);
Job job = Job.getInstance(getConf());
job.setJarByClass(Init.class);
job.setJobName(Init.class.getName() + "_load");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.getConfiguration().setInt(TRIE_NODE_SIZE_PROP, sconf.nodeSize);
job.getConfiguration().setInt(TRIE_STOP_LEVEL_PROP, sconf.stopLevel);
job.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.addInputPath(job, new Path(tmp, "nums"));
job.setMapperClass(InitMapper.class);
job.setCombinerClass(InitCombiner.class);
job.setReducerClass(InitReducer.class);
job.setOutputFormatClass(AccumuloFileOutputFormat.class);
job.setPartitionerClass(RangePartitioner.class);
FileSystem fs = FileSystem.get(job.getConfiguration());
Path splitsPath = new Path(tmp, "splits.txt");
Collection<Text> splits1 = writeSplits(props, fs, splitsPath);
RangePartitioner.setSplitFile(job, splitsPath.toString());
job.setNumReduceTasks(splits1.size() + 1);
Path outPath = new Path(tmp, "out");
AccumuloFileOutputFormat.setOutputPath(job, outPath);
job.getConfiguration().set("mapreduce.job.classloader", "true");
boolean success = job.waitForCompletion(true);
if (success) {
Path failPath = new Path(tmp, "failures");
fs.mkdirs(failPath);
AccumuloUtil.doTableOp(props, (tableOps, table) -> {
tableOps.importDirectory(table, outPath.toString(), failPath.toString(), false);
// Compacting files makes them local to each tablet and generates files using the tables
// settings.
tableOps.compact(table, new CompactionConfig().setWait(true));
});
}
return success ? 0 : 1;
}