in community/mahout-mr/mr/src/main/java/org/apache/mahout/math/hadoop/stochasticsvd/BtJob.java [496:627]
public static void run(Configuration conf,
Path[] inputPathA,
Path inputPathQJob,
Path xiPath,
Path outputPath,
int minSplitSize,
int k,
int p,
int btBlockHeight,
int numReduceTasks,
boolean broadcast,
Class<? extends Writable> labelClass,
boolean outputBBtProducts)
throws ClassNotFoundException, InterruptedException, IOException {
JobConf oldApiJob = new JobConf(conf);
MultipleOutputs.addNamedOutput(oldApiJob,
OUTPUT_Q,
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
labelClass,
VectorWritable.class);
if (outputBBtProducts) {
MultipleOutputs.addNamedOutput(oldApiJob,
OUTPUT_BBT,
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
IntWritable.class,
VectorWritable.class);
/*
* MAHOUT-1067: if we are asked to output BBT products then named vector
* names should be propagated to Q too so that UJob could pick them up
* from there.
*/
oldApiJob.setBoolean(PROP_NV, true);
}
if (xiPath != null) {
// compute pca -related stuff as well
MultipleOutputs.addNamedOutput(oldApiJob,
OUTPUT_SQ,
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
IntWritable.class,
VectorWritable.class);
MultipleOutputs.addNamedOutput(oldApiJob,
OUTPUT_SB,
org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
IntWritable.class,
VectorWritable.class);
}
/*
* HACK: we use old api multiple outputs since they are not available in the
* new api of either 0.20.2 or 0.20.203 but wrap it into a new api job so we
* can use new api interfaces.
*/
Job job = new Job(oldApiJob);
job.setJobName("Bt-job");
job.setJarByClass(BtJob.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.setInputPaths(job, inputPathA);
if (minSplitSize > 0) {
FileInputFormat.setMinInputSplitSize(job, minSplitSize);
}
FileOutputFormat.setOutputPath(job, outputPath);
// WARN: tight hadoop integration here:
job.getConfiguration().set("mapreduce.output.basename", OUTPUT_BT);
FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
SequenceFileOutputFormat.setOutputCompressionType(job,
CompressionType.BLOCK);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(SparseRowBlockWritable.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(VectorWritable.class);
job.setMapperClass(BtMapper.class);
job.setCombinerClass(OuterProductCombiner.class);
job.setReducerClass(OuterProductReducer.class);
job.getConfiguration().setInt(QJob.PROP_K, k);
job.getConfiguration().setInt(QJob.PROP_P, p);
job.getConfiguration().set(PROP_QJOB_PATH, inputPathQJob.toString());
job.getConfiguration().setBoolean(PROP_OUPTUT_BBT_PRODUCTS,
outputBBtProducts);
job.getConfiguration().setInt(PROP_OUTER_PROD_BLOCK_HEIGHT, btBlockHeight);
job.setNumReduceTasks(numReduceTasks);
/*
* PCA-related options, MAHOUT-817
*/
if (xiPath != null) {
job.getConfiguration().set(PROP_XI_PATH, xiPath.toString());
}
/*
* we can broadhast Rhat files since all of them are reuqired by each job,
* but not Q files which correspond to splits of A (so each split of A will
* require only particular Q file, each time different one).
*/
if (broadcast) {
job.getConfiguration().set(PROP_RHAT_BROADCAST, "y");
FileSystem fs = FileSystem.get(inputPathQJob.toUri(), conf);
FileStatus[] fstats =
fs.globStatus(new Path(inputPathQJob, QJob.OUTPUT_RHAT + "-*"));
if (fstats != null) {
for (FileStatus fstat : fstats) {
/*
* new api is not enabled yet in our dependencies at this time, still
* using deprecated one
*/
DistributedCache.addCacheFile(fstat.getPath().toUri(),
job.getConfiguration());
}
}
}
job.submit();
job.waitForCompletion(false);
if (!job.isSuccessful()) {
throw new IOException("Bt job unsuccessful.");
}
}