in src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java [143:207]
public int run(String[] args) throws Exception {
try (ContinuousEnv env = new ContinuousEnv(args)) {
String tableName = env.getAccumuloTableName();
Job job = Job.getInstance(getConf(),
this.getClass().getSimpleName() + "_" + tableName + "_" + System.currentTimeMillis());
job.setJarByClass(this.getClass());
job.setInputFormatClass(AccumuloInputFormat.class);
boolean scanOffline = Boolean
.parseBoolean(env.getTestProperty(TestProps.CI_VERIFY_SCAN_OFFLINE));
int maxMaps = Integer.parseInt(env.getTestProperty(TestProps.CI_VERIFY_MAX_MAPS));
int reducers = Integer.parseInt(env.getTestProperty(TestProps.CI_VERIFY_REDUCERS));
String outputDir = env.getTestProperty(TestProps.CI_VERIFY_OUTPUT_DIR);
ConsistencyLevel cl = TestProps
.getScanConsistencyLevel(env.getTestProperty(TestProps.CI_VERIFY_CONSISTENCY_LEVEL));
Set<Range> ranges;
String clone = "";
AccumuloClient client = env.getAccumuloClient();
String table;
if (scanOffline) {
clone = tableName + "_"
+ String.format("%016x", (env.getRandom().nextLong() & 0x7fffffffffffffffL));
client.tableOperations().clone(tableName, clone, true, new HashMap<>(), new HashSet<>());
ranges = client.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps);
client.tableOperations().offline(clone);
table = clone;
} else {
ranges = client.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps);
table = tableName;
}
AccumuloInputFormat.configure().clientProperties(env.getClientProps()).table(table)
.ranges(ranges).autoAdjustRanges(false).offlineScan(scanOffline).consistencyLevel(cl)
.store(job);
job.setMapperClass(CMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(VLongWritable.class);
job.setReducerClass(CReducer.class);
job.setNumReduceTasks(reducers);
job.setOutputFormatClass(TextOutputFormat.class);
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", scanOffline);
job.getConfiguration().set("mapreduce.job.classloader", "true");
Path outputPath = new Path(outputDir + "/" + job.getJobName());
TextOutputFormat.setOutputPath(job, outputPath);
log.info("Results from this run will be stored in {}", outputPath);
job.waitForCompletion(true);
if (scanOffline) {
client.tableOperations().delete(clone);
}
return job.isSuccessful() ? 0 : 1;
}
}