courses/DSL/dataflow-examples/apache-beam-basics.ipynb (632 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "f080f768-6eec-4cc4-841f-fabc498d0856", "metadata": { "tags": [] }, "source": [ "# Apache Beam Basics" ] }, { "cell_type": "code", "execution_count": null, "id": "pnRH5srPn8LLdiGS2xfTN9Lz", "metadata": { "id": "pnRH5srPn8LLdiGS2xfTN9Lz", "tags": [] }, "outputs": [], "source": [ "! pip install apache_beam apache-beam[gcp] --quiet" ] }, { "cell_type": "markdown", "id": "32241162-e61d-4a76-9ed7-3fd217eb621c", "metadata": {}, "source": [ "### Restart the runtime" ] }, { "cell_type": "code", "execution_count": null, "id": "cRs_CxwZAhxC", "metadata": { "id": "cRs_CxwZAhxC", "tags": [] }, "outputs": [], "source": [ "import IPython\n", "from IPython.display import display\n", "\n", "app = IPython.Application.instance()\n", "app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "id": "mJlIq_qYAzfF", "metadata": { "id": "mJlIq_qYAzfF", "tags": [] }, "source": [ "# Create a simple Pipeline From an Array" ] }, { "cell_type": "code", "execution_count": null, "id": "eWT4IGT78vfV", "metadata": { "id": "eWT4IGT78vfV", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Create' >> beam.Create(['noir', 'bree', 'gigi', 'gretyl', 'duchess', 'rusty'])\n", " | 'Transform' >> beam.Map(str.title)\n", " | 'Print' >> beam.Map(print)\n", " )" ] }, { "cell_type": "markdown", "id": "IbLs6gSHA2Kc", "metadata": { "id": "IbLs6gSHA2Kc" }, "source": [ "# Using .apply instead of the Pipe Character" ] }, { "cell_type": "code", "execution_count": null, "id": "pzxl2BjLAnh7", "metadata": { "id": "pzxl2BjLAnh7", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "\n", "with beam.Pipeline() as p:\n", " dogs = (\n", " p | 'Create' >> beam.Create(['noir', 'bree', 'gigi', 'gretyl', 'duchess', 'rusty'])\n", " )\n", " uppercase_dogs = dogs.apply(beam.Map(str.title))\n", " printed_dogs = uppercase_dogs.apply(beam.Map(print))" ] }, { "cell_type": "markdown", "id": "_NIlmBbfA89K", "metadata": { "id": "_NIlmBbfA89K" }, "source": [ "# Create a File for Testing\n", "\n", "No big deal here. Just creating a file to read from." ] }, { "cell_type": "code", "execution_count": null, "id": "lBsk_N4hAuds", "metadata": { "id": "lBsk_N4hAuds", "tags": [] }, "outputs": [], "source": [ "%%bash\n", "\n", "filename=\"./temp_files/dogs.txt\"\n", "\n", "# Make sure the temp directory exists\n", "mkdir temp_files\n", "\n", "# First make sure the file doesn't exist\n", "rm $filename\n", "\n", "#Write the dog names to a file dogs.txt\n", "for dog in Noir Bree Gigi Gretyl Duchess Rusty\n", "do\n", " echo $dog >> $filename\n", "done\n", "\n", "# This is a great line of code :)\n", "cat $filename" ] }, { "cell_type": "markdown", "id": "SENNtHCMBG83", "metadata": { "id": "SENNtHCMBG83", "tags": [] }, "source": [ "# Use Beam.IO to Read From a File" ] }, { "cell_type": "code", "execution_count": null, "id": "nbcW7QjHBLEe", "metadata": { "id": "nbcW7QjHBLEe", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.io import ReadFromText, WriteToText\n", "\n", "filename=\"./temp_files/dogs.txt\"\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Transform' >> beam.Map(str.upper)\n", " | 'Print' >> beam.Map(print)\n", " )" ] }, { "cell_type": "markdown", "id": "E5iQgBSdBR5A", "metadata": { "id": "E5iQgBSdBR5A" }, "source": [ "# Beam.Map\n", "\n", "This example just reads from a file and transforms it using a Python function.\n", "\n", "The .MaP() function passes each item in the PCollection to the function." ] }, { "cell_type": "code", "execution_count": null, "id": "KCFX75NqBTBm", "metadata": { "id": "KCFX75NqBTBm", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.io import ReadFromText, WriteToText\n", "\n", "def makeUppercase(element):\n", " return element.upper()\n", "\n", "filename=\"./temp_files/dogs.txt\"\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Transform' >> beam.Map(makeUppercase)\n", " | 'Print' >> beam.Map(print)\n", " )" ] }, { "cell_type": "markdown", "id": "Azbc9TikBox5", "metadata": { "id": "Azbc9TikBox5" }, "source": [ "# Use a Lambda Expression to Transform the Data" ] }, { "cell_type": "code", "execution_count": null, "id": "cveSv-F5BtGS", "metadata": { "id": "cveSv-F5BtGS", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.io import ReadFromText, WriteToText\n", "\n", "def makeUppercase(element):\n", " return element.upper()\n", "\n", "filename=\"./temp_files/dogs.txt\"\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " #| 'Transform' >> beam.Map(makeUppercase)\n", " | 'Transform with Lambda' >> beam.Map(lambda item: item.upper())\n", " | 'Print' >> beam.Map(print)\n", " )" ] }, { "cell_type": "markdown", "id": "ocVuG5oLCNKi", "metadata": { "id": "ocVuG5oLCNKi", "jp-MarkdownHeadingCollapsed": true, "tags": [] }, "source": [ "# Use Beam.IO Write a File" ] }, { "cell_type": "code", "execution_count": null, "id": "wdenoi-5CIbn", "metadata": { "id": "wdenoi-5CIbn", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.io import ReadFromText, WriteToText\n", "\n", "def makeUppercase(element):\n", " return element.upper()\n", "\n", "filename = 'dogs.txt'\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Transform' >> beam.Map(makeUppercase)\n", " | 'Write' >> WriteToText('uppercase-dogs.out')\n", " )\n", "\n", "\n", "# Use ls to see if the file was created and\n", "# cat to view the contents of the file.\n", "!ls uppercase-dogs.*\n", "! cat uppercase-dogs.out-00000-of-00001" ] }, { "cell_type": "markdown", "id": "B_BUvM54CTSm", "metadata": { "id": "B_BUvM54CTSm" }, "source": [ "# Using ParDo() instead of Map()\n", "\n", "Note the use of the yield statement when using Pardo, as opposed to the return statement when using a function called with Map(). A DoFN class is expected to return an iterator when being used by the ParDo() function. This is what yield returns.\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "LG4z8a_zCWto", "metadata": { "id": "LG4z8a_zCWto", "tags": [] }, "outputs": [], "source": [ "class MakeUpperCase(beam.DoFn):\n", " def process(self, element):\n", " transformed = element.upper()\n", " yield transformed\n", "\n", "\n", "filename=\"./temp_files/dogs.txt\"\n", "output_filename=\"./temp_files/pardo-uppercase-dogs.out\"\n", "\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Transform' >> beam.ParDo(MakeUpperCase())\n", " | 'Write' >> WriteToText(output_filename)\n", " )\n", "\n", "!ls \"./temp_files\"\n", "!cat ./temp_files/pardo-uppercase-dogs.out-00000-of-00001" ] }, { "cell_type": "markdown", "id": "BdsRh1rNCy1-", "metadata": { "id": "BdsRh1rNCy1-" }, "source": [ "# Create another Test File\n", "\n", "Just creating another test file. This one has Species and Name, so we can experiment with grouping and aggregations." ] }, { "cell_type": "code", "execution_count": null, "id": "TaQ8CRp9C6Df", "metadata": { "id": "TaQ8CRp9C6Df", "tags": [] }, "outputs": [], "source": [ "%%bash\n", "\n", "filename=\"./temp_files/pets.txt\"\n", "\n", "# First make sure the file doesn't exist\n", "rm $filename\n", "\n", "#Write the dog names to a file dogs.txt\n", "for pet in dog,noir dog,Bree dog,Gigi dog,Gretyl dog,Duchess dog,Rusty cat,Cleo cat,Sparkles cat,Phelix turtle,Cuff turtle,Link\n", "do\n", " echo $pet >> $filename\n", "done\n", "\n", "\n", "cat $filename\n", "\n", "ls ./temp_files" ] }, { "cell_type": "markdown", "id": "noZcBlkmDBsR", "metadata": { "id": "noZcBlkmDBsR" }, "source": [ "# Parse and Filter\n", "\n", "After reading the file, each row is converted into a Tuple with Species and Name.\n", "\n", "Then, we can get rid of the cats." ] }, { "cell_type": "code", "execution_count": null, "id": "3LRwzIY_DCjm", "metadata": { "id": "3LRwzIY_DCjm", "tags": [] }, "outputs": [], "source": [ "class ParsePets(beam.DoFn):\n", " def process(self,element):\n", " species, name = element.split(',')\n", " yield (species, name)\n", "\n", "class NoCats(beam.DoFn):\n", " def process(self,element):\n", " if element[0] != 'cat':\n", " yield element\n", "\n", "# The Python print() function does not return the element. \n", "# So, if you want to print inside a pipeline and then move on, \n", "# create your own print function that also returns the element.\n", "def print_results(element):\n", " print(element)\n", " return element\n", "\n", "filename=\"./temp_files/pets.txt\"\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Parse' >> beam.ParDo(ParsePets())\n", " | 'Filter' >> beam.ParDo(NoCats())\n", " | 'Make the Name UCase' >> beam.Map(lambda pet : (pet[0], pet[1].upper()))\n", " | 'Print' >> beam.Map(print_results)\n", " | 'Write' >> WriteToText('./temp_files/results.out')\n", " )\n", "\n", "! ls ./temp_files/results.*\n", "\n", "! cat ./temp_files/results.out-00000-of-00001" ] }, { "cell_type": "markdown", "id": "OqM1E73ADU9J", "metadata": { "id": "OqM1E73ADU9J" }, "source": [ "# Map and FlatMap versus Pardo\n", "\n", "The example below is functionally equivalent to the previous example. However, it uses Map() and FlatMap() rather then ParDo().\n", "\n", "Use FlatMap() when the number for items in the PCollection will be less after the transform. Also, in the filtering function, note the use of yield, not return." ] }, { "cell_type": "code", "execution_count": null, "id": "QsMuJzuIDXfV", "metadata": { "id": "QsMuJzuIDXfV", "tags": [] }, "outputs": [], "source": [ "def parseThePets(element):\n", " species, name = element.split(',')\n", " return (species, name)\n", "\n", "def filterOutTheCats(element):\n", " if element[0] != 'cat':\n", " species, name = (element[0], element[1])\n", " yield (species, name)\n", "\n", "\n", "def print_results(element):\n", " print(element)\n", " return element\n", "\n", "filename=\"./temp_files/pets.txt\"\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Parse' >> beam.Map(parseThePets)\n", " | 'Filter' >> beam.FlatMap(filterOutTheCats)\n", " | 'Transform' >> beam.Map(lambda pet : (pet[0], pet[1].upper()))\n", " | 'Write' >> WriteToText('./temp_files/filter_results.out')\n", " )\n", "\n", "! cat ./temp_files/filter_results.out-00000-of-00001" ] }, { "cell_type": "markdown", "id": "lsBoBy3JDipT", "metadata": { "id": "lsBoBy3JDipT" }, "source": [ "# Group By Key" ] }, { "cell_type": "code", "execution_count": null, "id": "wwzwLe3yDjcc", "metadata": { "id": "wwzwLe3yDjcc", "tags": [] }, "outputs": [], "source": [ "class ParsePets(beam.DoFn):\n", " def process(self,element):\n", " species, name = element.split(',')\n", " yield (species, name)\n", "\n", "class NoCats(beam.DoFn):\n", " def process(self,element):\n", " if element[0] != 'cat':\n", " yield element\n", "\n", "def print_results(element):\n", " print(element)\n", " return element\n", "\n", "filename=\"./temp_files/pets.txt\"\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Parse' >> beam.ParDo(ParsePets())\n", " | 'Filter' >> beam.ParDo(NoCats())\n", " | 'Make the Name UCase' >> beam.Map(lambda pet : (pet[0], pet[1].upper()))\n", " # The important line is here:\n", " | 'Group by Species' >> beam.GroupByKey()\n", " | 'Print' >> beam.Map(print_results)\n", " | 'Write' >> WriteToText('./temp_files/groupby-results.out')\n", " )\n", "\n", "\n", "!cat ./temp_files/groupby-results.out-00000-of-00001" ] }, { "cell_type": "markdown", "id": "4LY3-io0DsT1", "metadata": { "id": "4LY3-io0DsT1" }, "source": [ "# Count the Number of Pets by Species" ] }, { "cell_type": "code", "execution_count": null, "id": "tedvZZtxDuAR", "metadata": { "id": "tedvZZtxDuAR", "tags": [] }, "outputs": [], "source": [ "class ParsePets(beam.DoFn):\n", " def process(self,element):\n", " species, name = element.split(',')\n", " yield (species, name)\n", "\n", "\n", "def print_results(element):\n", " print(element)\n", " return element\n", "\n", "filename=\"./temp_files/pets.txt\"\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Parse' >> beam.ParDo(ParsePets())\n", " | 'PairWIthOne' >> beam.Map(lambda x: (x[0], 1))\n", " | 'GroupAndSum' >> beam.CombinePerKey(sum)\n", " | 'Print' >> beam.Map(print_results)\n", " | 'Write' >> WriteToText('./temp_files/count-results.out')\n", " )\n", "\n", "! cat ./temp_files/count-results.out-00000-of-00001" ] }, { "cell_type": "code", "execution_count": null, "id": "664d2da9-afd0-497c-90a2-cfe31923fd1a", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "colab": { "name": "drehnstrom (Jul 12, 2024, 1:16:46 PM)", "provenance": [] }, "environment": { "kernel": "python3", "name": ".m116", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/:m116" }, "kernelspec": { "display_name": "Python 3 (ipykernel) (Local)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }