dataflow/flex-templates/wordcount/wordcount.go (77 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // wordcount is an example that counts words in Shakespeare and includes Beam // best practices. // // The input file defaults to a public data set containing the text of King // Lear, by William Shakespeare. You can override it and choose your own input // with --input. package main import ( "context" "flag" "fmt" "log" "regexp" "strings" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" "github.com/apache/beam/sdks/v2/go/pkg/beam/register" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" ) var ( // By default, this example reads from a public dataset containing the text of // King Lear. Set this option to choose a different input file or glob. input = flag.String("input", "gs://apache-beam-samples/shakespeare/kinglear.txt", "File(s) to read.") // Set this required option to specify where to write the output. output = flag.String("output", "", "Output file (required).") ) func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Emitter1[string]() register.Function2x1(formatFn) } var ( wordRE = regexp.MustCompile(`[a-zA-Z]+('[a-z])?`) empty = beam.NewCounter("extract", "emptyLines") smallWordLength = flag.Int("small_word_length", 6, "length of small words (default: 9)") smallWords = beam.NewCounter("extract", "smallWords") lineLen = beam.NewDistribution("extract", "lineLenDistro") ) // extractFn is a DoFn that emits the words in a given line and keeps a count for small words. type extractFn struct { SmallWordLength int `json:"smallWordLength"` } // ProcessElement for extractFn processes a line at a time, emitting each word in that line // while keeping count of how many words are below the SmallWordLength threshold. func (f *extractFn) ProcessElement(ctx context.Context, line string, emit func(string)) { lineLen.Update(ctx, int64(len(line))) if len(strings.TrimSpace(line)) == 0 { empty.Inc(ctx, 1) } for _, word := range wordRE.FindAllString(line, -1) { // increment the counter for small words if length of words is // less than small_word_length if len(word) < f.SmallWordLength { smallWords.Inc(ctx, 1) } emit(word) } } // Format formats a KV of a word and its count as a string. func Format(s beam.Scope, counted beam.PCollection) beam.PCollection { return beam.ParDo(s, formatFn, counted) } // formatFn is a DoFn that formats a word and its count as a string. func formatFn(w string, c int) string { return fmt.Sprintf("%s: %v", w, c) } // CountWords is a composite transform that counts the words of a PCollection // of lines. It expects a PCollection of type string and returns a PCollection // of type KV<string,int>. The Beam type checker enforces these constraints // during pipeline construction. func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) } // WordCountFromPCol counts the words from a PCollection and validates it. func WordCountFromPCol(s beam.Scope, in beam.PCollection, hash string, size int) { out := Format(s, CountWords(s, in)) passert.Hash(s, out, "out", hash, size) } func main() { flag.Parse() beam.Init() if *output == "" { log.Fatal("No output provided") } p := beam.NewPipeline() s := p.Root() lines := textio.Read(s, *input) counted := CountWords(s, lines) formatted := beam.ParDo(s, formatFn, counted) textio.Write(s, *output, formatted) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }