public int run()

in community/mahout-mr/mr/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java [91:220]


  public int run(String[] args) throws Exception {

    addInputOption();
    addOutputOption();
    addOption("numberOfColumns", "r", "Number of columns in the input matrix", false);
    addOption("similarityClassname", "s", "Name of distributed similarity class to instantiate, alternatively use "
        + "one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')');
    addOption("maxSimilaritiesPerRow", "m", "Number of maximum similarities per row (default: "
        + DEFAULT_MAX_SIMILARITIES_PER_ROW + ')', String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ROW));
    addOption("excludeSelfSimilarity", "ess", "compute similarity of rows to themselves?", String.valueOf(false));
    addOption("threshold", "tr", "discard row pairs with a similarity value below this", false);
    addOption("maxObservationsPerRow", null, "sample rows down to this number of entries",
        String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_ROW));
    addOption("maxObservationsPerColumn", null, "sample columns down to this number of entries",
        String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_COLUMN));
    addOption("randomSeed", null, "use this seed for sampling", false);
    addOption(DefaultOptionCreator.overwriteOption().create());

    Map<String,List<String>> parsedArgs = parseArguments(args);
    if (parsedArgs == null) {
      return -1;
    }

    int numberOfColumns;

    if (hasOption("numberOfColumns")) {
      // Number of columns explicitly specified via CLI
      numberOfColumns = Integer.parseInt(getOption("numberOfColumns"));
    } else {
      // else get the number of columns by determining the cardinality of a vector in the input matrix
      numberOfColumns = getDimensions(getInputPath());
    }

    String similarityClassnameArg = getOption("similarityClassname");
    String similarityClassname;
    try {
      similarityClassname = VectorSimilarityMeasures.valueOf(similarityClassnameArg).getClassname();
    } catch (IllegalArgumentException iae) {
      similarityClassname = similarityClassnameArg;
    }

    // Clear the output and temp paths if the overwrite option has been set
    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
      // Clear the temp path
      HadoopUtil.delete(getConf(), getTempPath());
      // Clear the output path
      HadoopUtil.delete(getConf(), getOutputPath());
    }

    int maxSimilaritiesPerRow = Integer.parseInt(getOption("maxSimilaritiesPerRow"));
    boolean excludeSelfSimilarity = Boolean.parseBoolean(getOption("excludeSelfSimilarity"));
    double threshold = hasOption("threshold")
        ? Double.parseDouble(getOption("threshold")) : NO_THRESHOLD;
    long randomSeed = hasOption("randomSeed")
        ? Long.parseLong(getOption("randomSeed")) : NO_FIXED_RANDOM_SEED;

    int maxObservationsPerRow = Integer.parseInt(getOption("maxObservationsPerRow"));
    int maxObservationsPerColumn = Integer.parseInt(getOption("maxObservationsPerColumn"));

    Path weightsPath = getTempPath("weights");
    Path normsPath = getTempPath("norms.bin");
    Path numNonZeroEntriesPath = getTempPath("numNonZeroEntries.bin");
    Path maxValuesPath = getTempPath("maxValues.bin");
    Path pairwiseSimilarityPath = getTempPath("pairwiseSimilarity");

    Path observationsPerColumnPath = getTempPath("observationsPerColumn.bin");

    AtomicInteger currentPhase = new AtomicInteger();

    Job countObservations = prepareJob(getInputPath(), getTempPath("notUsed"), CountObservationsMapper.class,
        NullWritable.class, VectorWritable.class, SumObservationsReducer.class, NullWritable.class,
        VectorWritable.class);
    countObservations.setCombinerClass(VectorSumCombiner.class);
    countObservations.getConfiguration().set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString());
    countObservations.setNumReduceTasks(1);
    countObservations.waitForCompletion(true);

    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
      Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, VectorNormMapper.class, IntWritable.class,
          VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, VectorWritable.class);
      normsAndTranspose.setCombinerClass(MergeVectorsCombiner.class);
      Configuration normsAndTransposeConf = normsAndTranspose.getConfiguration();
      normsAndTransposeConf.set(THRESHOLD, String.valueOf(threshold));
      normsAndTransposeConf.set(NORMS_PATH, normsPath.toString());
      normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
      normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
      normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
      normsAndTransposeConf.set(OBSERVATIONS_PER_COLUMN_PATH, observationsPerColumnPath.toString());
      normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_ROW, String.valueOf(maxObservationsPerRow));
      normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_COLUMN, String.valueOf(maxObservationsPerColumn));
      normsAndTransposeConf.set(RANDOM_SEED, String.valueOf(randomSeed));

      boolean succeeded = normsAndTranspose.waitForCompletion(true);
      if (!succeeded) {
        return -1;
      }
    }

    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
      Job pairwiseSimilarity = prepareJob(weightsPath, pairwiseSimilarityPath, CooccurrencesMapper.class,
          IntWritable.class, VectorWritable.class, SimilarityReducer.class, IntWritable.class, VectorWritable.class);
      pairwiseSimilarity.setCombinerClass(VectorSumReducer.class);
      Configuration pairwiseConf = pairwiseSimilarity.getConfiguration();
      pairwiseConf.set(THRESHOLD, String.valueOf(threshold));
      pairwiseConf.set(NORMS_PATH, normsPath.toString());
      pairwiseConf.set(NUM_NON_ZERO_ENTRIES_PATH, numNonZeroEntriesPath.toString());
      pairwiseConf.set(MAXVALUES_PATH, maxValuesPath.toString());
      pairwiseConf.set(SIMILARITY_CLASSNAME, similarityClassname);
      pairwiseConf.setInt(NUMBER_OF_COLUMNS, numberOfColumns);
      pairwiseConf.setBoolean(EXCLUDE_SELF_SIMILARITY, excludeSelfSimilarity);
      boolean succeeded = pairwiseSimilarity.waitForCompletion(true);
      if (!succeeded) {
        return -1;
      }
    }

    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
      Job asMatrix = prepareJob(pairwiseSimilarityPath, getOutputPath(), UnsymmetrifyMapper.class,
          IntWritable.class, VectorWritable.class, MergeToTopKSimilaritiesReducer.class, IntWritable.class,
          VectorWritable.class);
      asMatrix.setCombinerClass(MergeToTopKSimilaritiesReducer.class);
      asMatrix.getConfiguration().setInt(MAX_SIMILARITIES_PER_ROW, maxSimilaritiesPerRow);
      boolean succeeded = asMatrix.waitForCompletion(true);
      if (!succeeded) {
        return -1;
      }
    }

    return 0;
  }