def apply()

in wayang-benchmark/src/main/scala/org/apache/wayang/apps/simwords/Word2NVec.scala [41:95]


  def apply(inputFile: String,
            minWordOccurrences: Int,
            neighborhoodReach: Int,
            wordsPerLine: ProbabilisticDoubleInterval,
            outputFile: String)
           (implicit experiment: Experiment,
            configuration: Configuration) = {

    // Initialize.
    val wayangCtx = new WayangContext(configuration)
    plugins.foreach(wayangCtx.register)
    val planBuilder = new PlanBuilder(wayangCtx)
      .withJobName(
        jobName = s"Word2NVec ($inputFile, reach=$neighborhoodReach, output=$outputFile)"
      ).withExperiment(experiment)
      .withUdfJarsOf(this.getClass)

    // Create the word dictionary
    val _minWordOccurrences = minWordOccurrences
    val wordIds = planBuilder
      .readTextFile(inputFile).withName("Read corpus (1)")
      .flatMapJava(new ScrubFunction, selectivity = wordsPerLine).withName("Split & scrub")
      .map(word => (word, 1)).withName("Add word counter")
      .reduceByKey(_._1, (wc1, wc2) => (wc1._1, wc1._2 + wc2._2)).withName("Sum word counters")
      .withCardinalityEstimator((in: Long) => math.round(in * 0.01))
      .filter(_._2 >= _minWordOccurrences, selectivity = 10d / (9d + minWordOccurrences))
      .withName("Filter frequent words")
      .map(_._1).withName("Strip word counter")
      .zipWithId.withName("Zip with ID")
      .map(t => (t.field1, t.field0.toInt)).withName("Convert ID attachment")


    // Create the word neighborhood vectors.
    val wordVectors = planBuilder
      .readTextFile(inputFile).withName("Read corpus (2)")
      .flatMapJava(
        new CreateWordNeighborhoodFunction(neighborhoodReach, "wordIds"),
        selectivity = wordsPerLine,
        udfLoad = LoadProfileEstimators.createFromSpecification("wayang.apps.simwords.udfs.create-neighborhood.load", configuration)

      )
      .withBroadcast(wordIds, "wordIds")
      .withName("Create word vectors")
      .reduceByKey(_._1, (wv1, wv2) => (wv1._1, wv1._2 + wv2._2)).withName("Add word vectors")
      .map { wv =>
        wv._2.normalize(); wv
      }.withName("Normalize word vectors")

    // Enhance the word vectors by joining the actual word and write to an output file.
    wordVectors
      .mapJava(new ExtendWordVector)
      .withBroadcast(wordIds, "wordIds")
      .withName("Extend word vectors")
      .writeTextFile(outputFile, wv => s"${wv._1};${wv._2};${wv._3.toDictionaryString}")
  }