in community/mahout-mr/mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java [91:220]
public int run(String[] args) throws Exception {
addInputOption();
addOutputOption();
addOption("numberOfColumns", "r", "Number of columns in the input matrix", false);
addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use "
+ "one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
addOption("maxSimilaritiesPerRow", "m", "Number of maximum similarities per row (default: "
+ DEFAULT_MAX_SIMILARITIES_PER_ROW + ')', String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ROW));
addOption("excludeSelfSimilarity", "ess", "compute similarity of rows to themselves?", String.valueOf(false));
addOption("threshold", "tr", "discard row pairs with a similarity value below this", false);
addOption("maxObservationsPerRow", null, "sample rows down to this number of entries",
String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_ROW));
addOption("maxObservationsPerColumn", null, "sample columns down to this number of entries",
String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_COLUMN));
addOption("randomSeed", null, "use this seed for sampling", false);
addOption(DefaultOptionCreator.overwriteOption().create());
Map<String,List<String>> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
return -1;
}
int numberOfColumns;
if (hasOption("numberOfColumns")) {
// Number of columns explicitly specified via CLI
numberOfColumns = Integer.parseInt(getOption("numberOfColumns"));
} else {
// else get the number of columns by determining the cardinality of a vector in the input matrix
numberOfColumns = getDimensions(getInputPath());
}
String similarityClassnameArg = getOption("similarityClassname");
String similarityClassname;
try {
similarityClassname = VectorSimilarityMeasures.valueOf(similarityClassnameArg).getClassname();
} catch (IllegalArgumentException iae) {
similarityClassname = similarityClassnameArg;
}
// Clear the output and temp paths if the overwrite option has been set
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
// Clear the temp path
HadoopUtil.delete(getConf(), getTempPath());
// Clear the output path
HadoopUtil.delete(getConf(), getOutputPath());
}
int maxSimilaritiesPerRow = Integer.parseInt(getOption("maxSimilaritiesPerRow"));
boolean excludeSelfSimilarity = Boolean.parseBoolean(getOption("excludeSelfSimilarity"));
double threshold = hasOption("threshold")
? Double.parseDouble(getOption("threshold")) : NO_THRESHOLD;
long randomSeed = hasOption("randomSeed")
? Long.parseLong(getOption("randomSeed")) : NO_FIXED_RANDOM_SEED;
int maxObservationsPerRow = Integer.parseInt(getOption("maxObservationsPerRow"));
int maxObservationsPerColumn = Integer.parseInt(getOption("maxObservationsPerColumn"));
Path weightsPath = getTempPath("weights");
Path normsPath = getTempPath("norms.bin");
Path numNonZeroEntriesPath = getTempPath("numNonZeroEntries.bin");
Path maxValuesPath = getTempPath("maxValues.bin");
Path pairwiseSimilarityPath = getTempPath("pairwiseSimilarity");
Path observationsPerColumnPath = getTempPath("observationsPerColumn.bin");
AtomicInteger currentPhase = new AtomicInteger();
Job countObservations = prepareJob(getInputPath(), getTempPath("notUsed"), CountObservationsMapper.class,
NullWritable.class, VectorWritable.class, SumObservationsReducer.class, NullWritable.class,
VectorWritable.class);
countObservations.setCombinerClass(VectorSumCombiner.class);
countObservations.getConfiguration().set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString());
countObservations.setNumReduceTasks(1);
countObservations.waitForCompletion(true);
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
normsAndTransposeConf.set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString());
normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_ROW, String.valueOf(maxObservationsPerRow));
normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_COLUMN, String.valueOf(maxObservationsPerColumn));
normsAndTransposeConf.set(RANDOM_SEED, String.valueOf(randomSeed));
boolean succeeded = normsAndTranspose.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
pairwiseConf.set(NORMS_PATH, normsPath.toString());
pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
boolean succeeded = pairwiseSimilarity.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
VectorWritable.class);
asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
boolean succeeded = asMatrix.waitForCompletion(true);
if (!succeeded) {
return -1;
}
}
return 0;
}