public int run()

in src/main/java/org/apache/accumulo/testing/continuous/ContinuousVerify.java [143:207]


  public int run(String[] args) throws Exception {

    try (ContinuousEnv env = new ContinuousEnv(args)) {

      String tableName = env.getAccumuloTableName();

      Job job = Job.getInstance(getConf(),
          this.getClass().getSimpleName() + "_" + tableName + "_" + System.currentTimeMillis());
      job.setJarByClass(this.getClass());

      job.setInputFormatClass(AccumuloInputFormat.class);

      boolean scanOffline = Boolean
          .parseBoolean(env.getTestProperty(TestProps.CI_VERIFY_SCAN_OFFLINE));
      int maxMaps = Integer.parseInt(env.getTestProperty(TestProps.CI_VERIFY_MAX_MAPS));
      int reducers = Integer.parseInt(env.getTestProperty(TestProps.CI_VERIFY_REDUCERS));
      String outputDir = env.getTestProperty(TestProps.CI_VERIFY_OUTPUT_DIR);
      ConsistencyLevel cl = TestProps
          .getScanConsistencyLevel(env.getTestProperty(TestProps.CI_VERIFY_CONSISTENCY_LEVEL));

      Set<Range> ranges;
      String clone = "";
      AccumuloClient client = env.getAccumuloClient();
      String table;

      if (scanOffline) {
        clone = tableName + "_"
            + String.format("%016x", (env.getRandom().nextLong() & 0x7fffffffffffffffL));
        client.tableOperations().clone(tableName, clone, true, new HashMap<>(), new HashSet<>());
        ranges = client.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps);
        client.tableOperations().offline(clone);
        table = clone;
      } else {
        ranges = client.tableOperations().splitRangeByTablets(tableName, new Range(), maxMaps);
        table = tableName;
      }

      AccumuloInputFormat.configure().clientProperties(env.getClientProps()).table(table)
          .ranges(ranges).autoAdjustRanges(false).offlineScan(scanOffline).consistencyLevel(cl)
          .store(job);

      job.setMapperClass(CMapper.class);
      job.setMapOutputKeyClass(LongWritable.class);
      job.setMapOutputValueClass(VLongWritable.class);

      job.setReducerClass(CReducer.class);
      job.setNumReduceTasks(reducers);

      job.setOutputFormatClass(TextOutputFormat.class);

      job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", scanOffline);
      job.getConfiguration().set("mapreduce.job.classloader", "true");

      Path outputPath = new Path(outputDir + "/" + job.getJobName());
      TextOutputFormat.setOutputPath(job, outputPath);
      log.info("Results from this run will be stored in {}", outputPath);

      job.waitForCompletion(true);

      if (scanOffline) {
        client.tableOperations().delete(clone);
      }
      return job.isSuccessful() ? 0 : 1;
    }
  }