public static void run()

in community/mahout-mr/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java [496:627]


  public static void run(Configuration conf,
                         Path[] inputPathA,
                         Path inputPathQJob,
                         Path xiPath,
                         Path outputPath,
                         int minSplitSize,
                         int k,
                         int p,
                         int btBlockHeight,
                         int numReduceTasks,
                         boolean broadcast,
                         Class<? extends Writable> labelClass,
                         boolean outputBBtProducts)
    throws ClassNotFoundException, InterruptedException, IOException {

    JobConf oldApiJob = new JobConf(conf);

    MultipleOutputs.addNamedOutput(oldApiJob,
                                   OUTPUT_Q,
                                   org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
                                   labelClass,
                                   VectorWritable.class);

    if (outputBBtProducts) {
      MultipleOutputs.addNamedOutput(oldApiJob,
                                     OUTPUT_BBT,
                                     org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
                                     IntWritable.class,
                                     VectorWritable.class);
      /*
       * MAHOUT-1067: if we are asked to output BBT products then named vector
       * names should be propagated to Q too so that UJob could pick them up
       * from there.
       */
      oldApiJob.setBoolean(PROP_NV, true);
    }
    if (xiPath != null) {
      // compute pca -related stuff as well
      MultipleOutputs.addNamedOutput(oldApiJob,
                                     OUTPUT_SQ,
                                     org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
                                     IntWritable.class,
                                     VectorWritable.class);
      MultipleOutputs.addNamedOutput(oldApiJob,
                                     OUTPUT_SB,
                                     org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
                                     IntWritable.class,
                                     VectorWritable.class);
    }

    /*
     * HACK: we use old api multiple outputs since they are not available in the
     * new api of either 0.20.2 or 0.20.203 but wrap it into a new api job so we
     * can use new api interfaces.
     */

    Job job = new Job(oldApiJob);
    job.setJobName("Bt-job");
    job.setJarByClass(BtJob.class);

    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    FileInputFormat.setInputPaths(job, inputPathA);
    if (minSplitSize > 0) {
      FileInputFormat.setMinInputSplitSize(job, minSplitSize);
    }
    FileOutputFormat.setOutputPath(job, outputPath);

    // WARN: tight hadoop integration here:
    job.getConfiguration().set("mapreduce.output.basename", OUTPUT_BT);

    FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job,
                                                      CompressionType.BLOCK);

    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(SparseRowBlockWritable.class);

    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(VectorWritable.class);

    job.setMapperClass(BtMapper.class);
    job.setCombinerClass(OuterProductCombiner.class);
    job.setReducerClass(OuterProductReducer.class);

    job.getConfiguration().setInt(QJob.PROP_K, k);
    job.getConfiguration().setInt(QJob.PROP_P, p);
    job.getConfiguration().set(PROP_QJOB_PATH, inputPathQJob.toString());
    job.getConfiguration().setBoolean(PROP_OUPTUT_BBT_PRODUCTS,
                                      outputBBtProducts);
    job.getConfiguration().setInt(PROP_OUTER_PROD_BLOCK_HEIGHT, btBlockHeight);

    job.setNumReduceTasks(numReduceTasks);

    /*
     * PCA-related options, MAHOUT-817
     */
    if (xiPath != null) {
      job.getConfiguration().set(PROP_XI_PATH, xiPath.toString());
    }

    /*
     * we can broadhast Rhat files since all of them are reuqired by each job,
     * but not Q files which correspond to splits of A (so each split of A will
     * require only particular Q file, each time different one).
     */

    if (broadcast) {
      job.getConfiguration().set(PROP_RHAT_BROADCAST, "y");

      FileSystem fs = FileSystem.get(inputPathQJob.toUri(), conf);
      FileStatus[] fstats =
        fs.globStatus(new Path(inputPathQJob, QJob.OUTPUT_RHAT + "-*"));
      if (fstats != null) {
        for (FileStatus fstat : fstats) {
          /*
           * new api is not enabled yet in our dependencies at this time, still
           * using deprecated one
           */
          DistributedCache.addCacheFile(fstat.getPath().toUri(),
                                        job.getConfiguration());
        }
      }
    }

    job.submit();
    job.waitForCompletion(false);

    if (!job.isSuccessful()) {
      throw new IOException("Bt job unsuccessful.");
    }
  }