in wayang-benchmark/src/main/scala/org/apache/wayang/apps/simwords/Word2NVec.scala [41:95]
def apply(inputFile: String,
minWordOccurrences: Int,
neighborhoodReach: Int,
wordsPerLine: ProbabilisticDoubleInterval,
outputFile: String)
(implicit experiment: Experiment,
configuration: Configuration) = {
// Initialize.
val wayangCtx = new WayangContext(configuration)
plugins.foreach(wayangCtx.register)
val planBuilder = new PlanBuilder(wayangCtx)
.withJobName(
jobName = s"Word2NVec ($inputFile, reach=$neighborhoodReach, output=$outputFile)"
).withExperiment(experiment)
.withUdfJarsOf(this.getClass)
// Create the word dictionary
val _minWordOccurrences = minWordOccurrences
val wordIds = planBuilder
.readTextFile(inputFile).withName("Read corpus (1)")
.flatMapJava(new ScrubFunction, selectivity = wordsPerLine).withName("Split & scrub")
.map(word => (word, 1)).withName("Add word counter")
.reduceByKey(_._1, (wc1, wc2) => (wc1._1, wc1._2 + wc2._2)).withName("Sum word counters")
.withCardinalityEstimator((in: Long) => math.round(in * 0.01))
.filter(_._2 >= _minWordOccurrences, selectivity = 10d / (9d + minWordOccurrences))
.withName("Filter frequent words")
.map(_._1).withName("Strip word counter")
.zipWithId.withName("Zip with ID")
.map(t => (t.field1, t.field0.toInt)).withName("Convert ID attachment")
// Create the word neighborhood vectors.
val wordVectors = planBuilder
.readTextFile(inputFile).withName("Read corpus (2)")
.flatMapJava(
new CreateWordNeighborhoodFunction(neighborhoodReach, "wordIds"),
selectivity = wordsPerLine,
udfLoad = LoadProfileEstimators.createFromSpecification("wayang.apps.simwords.udfs.create-neighborhood.load", configuration)
)
.withBroadcast(wordIds, "wordIds")
.withName("Create word vectors")
.reduceByKey(_._1, (wv1, wv2) => (wv1._1, wv1._2 + wv2._2)).withName("Add word vectors")
.map { wv =>
wv._2.normalize(); wv
}.withName("Normalize word vectors")
// Enhance the word vectors by joining the actual word and write to an output file.
wordVectors
.mapJava(new ExtendWordVector)
.withBroadcast(wordIds, "wordIds")
.withName("Extend word vectors")
.writeTextFile(outputFile, wv => s"${wv._1};${wv._2};${wv._3.toDictionaryString}")
}