courses/DSL/dataflow-examples/apache-beam-schemas.ipynb (480 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "b6856ad0-1a3c-4ccf-81e9-5bc46b67b80f", "metadata": {}, "source": [ "# Apache Beam Schemas" ] }, { "cell_type": "code", "execution_count": null, "id": "pnRH5srPn8LLdiGS2xfTN9Lz", "metadata": { "id": "pnRH5srPn8LLdiGS2xfTN9Lz", "tags": [] }, "outputs": [], "source": [ "! pip install apache_beam apache-beam[gcp] --quiet" ] }, { "cell_type": "code", "execution_count": null, "id": "cRs_CxwZAhxC", "metadata": { "id": "cRs_CxwZAhxC" }, "outputs": [], "source": [ "import IPython\n", "from IPython.display import display\n", "\n", "app = IPython.Application.instance()\n", "app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "id": "SFrDA0iFHJCU", "metadata": { "id": "SFrDA0iFHJCU" }, "source": [ "# Create the Pet schema and some test data" ] }, { "cell_type": "code", "execution_count": null, "id": "9-XW6ngc_QZK", "metadata": { "id": "9-XW6ngc_QZK", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from typing import NamedTuple\n", "\n", "class Pet(NamedTuple):\n", " name: str\n", " pet_type: str\n", " breed: str\n", "\n", "# Creating 20 pets\n", "pets = [\n", " Pet(name=\"Buddy\", pet_type=\"Dog\", breed=\"Golden Retriever\"),\n", " Pet(name=\"Mittens\", pet_type=\"Cat\", breed=\"Siamese\"),\n", " Pet(name=\"Max\", pet_type=\"Dog\", breed=\"Beagle\"),\n", " Pet(name=\"Bella\", pet_type=\"Dog\", breed=\"Labrador\"),\n", " Pet(name=\"Charlie\", pet_type=\"Dog\", breed=\"Poodle\"),\n", " Pet(name=\"Lucy\", pet_type=\"Cat\", breed=\"Persian\"),\n", " Pet(name=\"Daisy\", pet_type=\"Dog\", breed=\"Bulldog\"),\n", " Pet(name=\"Luna\", pet_type=\"Cat\", breed=\"Maine Coon\"),\n", " Pet(name=\"Rocky\", pet_type=\"Dog\", breed=\"Rottweiler\"),\n", " Pet(name=\"Lola\", pet_type=\"Cat\", breed=\"Bengal\"),\n", " Pet(name=\"Jack\", pet_type=\"Dog\", breed=\"Boxer\"),\n", " Pet(name=\"Nala\", pet_type=\"Cat\", breed=\"Ragdoll\"),\n", " Pet(name=\"Zeus\", pet_type=\"Dog\", breed=\"German Shepherd\"),\n", " Pet(name=\"Chloe\", pet_type=\"Cat\", breed=\"British Shorthair\"),\n", " Pet(name=\"Buster\", pet_type=\"Dog\", breed=\"Dachshund\"),\n", " Pet(name=\"Simba\", pet_type=\"Cat\", breed=\"Sphynx\"),\n", " Pet(name=\"Cooper\", pet_type=\"Dog\", breed=\"Cocker Spaniel\"),\n", " Pet(name=\"Sasha\", pet_type=\"Cat\", breed=\"Scottish Fold\"),\n", " Pet(name=\"Milo\", pet_type=\"Dog\", breed=\"Shih Tzu\"),\n", " Pet(name=\"Oreo\", pet_type=\"Cat\", breed=\"Abyssinian\")\n", "]\n", "\n", "# Show the first 5 pets.\n", "print(pets[:5])" ] }, { "cell_type": "markdown", "id": "yXGue2i4HQw-", "metadata": { "id": "yXGue2i4HQw-" }, "source": [ "## Simple Pipeline\n", "\n", "Create the pets and print them" ] }, { "cell_type": "code", "execution_count": null, "id": "aQ6-j6Y7DSr5", "metadata": { "id": "aQ6-j6Y7DSr5", "tags": [] }, "outputs": [], "source": [ "# Beam pipeline\n", "with beam.Pipeline() as p:\n", " pet_collection = (\n", " p\n", " | 'Create pets' >> beam.Create(pets)\n", " | 'Print pets' >> beam.Map(print)\n", " )\n" ] }, { "cell_type": "markdown", "id": "5zfHZ9fHHZKj", "metadata": { "id": "5zfHZ9fHHZKj" }, "source": [ "## Use Filter() to return only the dogs" ] }, { "cell_type": "code", "execution_count": null, "id": "ik0-kT__DbXf", "metadata": { "id": "ik0-kT__DbXf", "tags": [] }, "outputs": [], "source": [ "# Beam pipeline\n", "with beam.Pipeline() as p:\n", " pet_collection = (\n", " p\n", " | 'Create pets' >> beam.Create(pets)\n", " | 'Get only the Dogs' >> beam.Filter(lambda pet: pet.pet_type == 'Dog')\n", " | 'Print pets' >> beam.Map(print)\n", " )" ] }, { "cell_type": "markdown", "id": "VUEWPXrB2zY1", "metadata": { "id": "VUEWPXrB2zY1" }, "source": [ "## Filter with DoFn" ] }, { "cell_type": "code", "execution_count": null, "id": "1D2y03ds22lz", "metadata": { "id": "1D2y03ds22lz", "tags": [] }, "outputs": [], "source": [ "class OnlyCats(beam.DoFn):\n", " def process(self, pet):\n", " if pet.pet_type == 'Cat':\n", " yield pet\n", "\n", "\n", "# Beam pipeline\n", "with beam.Pipeline() as p:\n", " pet_collection = (\n", " p\n", " | 'Create pets' >> beam.Create(pets)\n", " | 'Get only the Dogs' >> beam.ParDo(OnlyCats())\n", " | 'Print pets' >> beam.Map(print)\n", " )" ] }, { "cell_type": "markdown", "id": "4AHQIKLZ3eIn", "metadata": { "id": "4AHQIKLZ3eIn" }, "source": [ "## Filter with FlatMap()\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "7LvJyyMG3kfW", "metadata": { "id": "7LvJyyMG3kfW", "tags": [] }, "outputs": [], "source": [ "def NoCatsFilter(pet):\n", " if pet.pet_type == 'Dog':\n", " return pet\n", "\n", "\n", "# Beam pipeline\n", "with beam.Pipeline() as p:\n", " pet_collection = (\n", " p\n", " | 'Create pets' >> beam.Create(pets)\n", " | 'Get only the Dogs' >> beam.FlatMap(NoCatsFilter)\n", " | 'Print pets' >> beam.Map(print)\n", " )" ] }, { "cell_type": "markdown", "id": "wULJaeRPHfSZ", "metadata": { "id": "wULJaeRPHfSZ" }, "source": [ "# Branch the pets to return dogs and cats." ] }, { "cell_type": "code", "execution_count": null, "id": "6ZSuZdRBFUCd", "metadata": { "id": "6ZSuZdRBFUCd", "tags": [] }, "outputs": [], "source": [ "class FilterPetsDoFn(beam.DoFn):\n", " def process(self, pet):\n", " if pet.pet_type == 'Dog':\n", " yield beam.pvalue.TaggedOutput('dogs', pet)\n", " elif pet.pet_type == 'Cat':\n", " yield beam.pvalue.TaggedOutput('cats', pet)\n", "\n", "\n", "with beam.Pipeline() as p:\n", " pet_collection = p | 'Create pets' >> beam.Create(pets)\n", "\n", " filtered_pets = (\n", " pet_collection\n", " | 'Filter pets' >> beam.ParDo(FilterPetsDoFn()).with_outputs('dogs', 'cats')\n", " )\n", "\n", " dogs = filtered_pets.dogs | 'Print dogs' >> beam.Map(print)\n", " # cats = filtered_pets.cats | 'Print cats' >> beam.Map(print)" ] }, { "cell_type": "markdown", "id": "o4Qd7TuTHvqc", "metadata": { "id": "o4Qd7TuTHvqc" }, "source": [ "## Output the dogs and cats into seperate CSV files" ] }, { "cell_type": "code", "execution_count": null, "id": "8aZEh4D-GF1p", "metadata": { "id": "8aZEh4D-GF1p", "tags": [] }, "outputs": [], "source": [ "class FilterPetsDoFn(beam.DoFn):\n", " def process(self, pet):\n", " if pet.pet_type == 'Dog':\n", " yield beam.pvalue.TaggedOutput('dogs', pet)\n", " elif pet.pet_type == 'Cat':\n", " yield beam.pvalue.TaggedOutput('cats', pet)\n", "\n", "def format_csv(pet):\n", " return f'{pet.name},{pet.pet_type},{pet.breed}'\n", "\n", "# Beam pipeline\n", "with beam.Pipeline() as p:\n", " pet_collection = p | 'Create pets' >> beam.Create(pets)\n", "\n", " filtered_pets = (\n", " pet_collection\n", " | 'Filter pets' >> beam.ParDo(FilterPetsDoFn()).with_outputs('dogs', 'cats')\n", " )\n", "\n", " dog_collection = filtered_pets.dogs | 'Format dogs to CSV' >> beam.Map(format_csv)\n", " cat_collection = filtered_pets.cats | 'Format cats to CSV' >> beam.Map(format_csv)\n", "\n", " dog_collection | 'Write dogs to CSV' >> beam.io.WriteToText('./temp_files/only_dogs', file_name_suffix='.csv')\n", " cat_collection | 'Write cats to CSV' >> beam.io.WriteToText('./temp_files/only_cats', file_name_suffix='.csv')\n", "\n", "\n", "\n", "\n", "! cat ./temp_files/only_dogs-00000-of-00001.csv\n", "! cat ./temp_files/only_cats-00000-of-00001.csv" ] }, { "cell_type": "markdown", "id": "uclsrxArIejz", "metadata": { "id": "uclsrxArIejz" }, "source": [ "# Nested data - Owners and Pets" ] }, { "cell_type": "code", "execution_count": null, "id": "sLvIM_QbIZc1", "metadata": { "id": "sLvIM_QbIZc1", "tags": [] }, "outputs": [], "source": [ "import random\n", "from ctypes import Array\n", "import apache_beam as beam\n", "from typing import NamedTuple, List\n", "\n", "class Pet(NamedTuple):\n", " name: str\n", " pet_type: str\n", " breed: str\n", "\n", "class Owner(NamedTuple):\n", " name: str\n", " pets: List[Pet]\n", "\n", "# Sample pet data\n", "pet_names = [\"Buddy\", \"Mittens\", \"Max\", \"Bella\", \"Charlie\", \"Lucy\", \"Daisy\", \"Luna\", \"Rocky\", \"Lola\", \"Jack\", \"Nala\", \"Zeus\", \"Chloe\", \"Buster\", \"Simba\", \"Cooper\", \"Sasha\", \"Milo\", \"Oreo\"]\n", "pet_types = [\"Dog\", \"Cat\"]\n", "dog_breeds = [\"Golden Retriever\", \"Beagle\", \"Labrador\", \"Poodle\", \"Bulldog\", \"Rottweiler\", \"Boxer\", \"German Shepherd\", \"Dachshund\", \"Cocker Spaniel\", \"Shih Tzu\"]\n", "cat_breeds = [\"Siamese\", \"Persian\", \"Maine Coon\", \"Bengal\", \"Ragdoll\", \"British Shorthair\", \"Sphynx\", \"Scottish Fold\", \"Abyssinian\"]\n", "\n", "def random_pet() -> Pet:\n", " pet_type = random.choice(pet_types)\n", " breed = random.choice(dog_breeds if pet_type == \"Dog\" else cat_breeds)\n", " name = random.choice(pet_names)\n", " return Pet(name=name, pet_type=pet_type, breed=breed)\n", "\n", "def random_pets() -> List[Pet]:\n", " return [random_pet() for _ in range(random.randint(1, 2))]\n", "\n", "# Sample owner data\n", "owner_names = [\"Alice\", \"Bob\", \"Charlie\", \"Diana\", \"Doug\", \"Edward\", \"Fiona\", \"George\", \"Hannah\", \"Ivan\", \"Julia\", \"Michael\", \"Patrick\"]\n", "\n", "owners = [Owner(name=name, pets=random_pets()) for name in owner_names]\n", "\n", "# Just show one Owner\n", "print(owners[0])" ] }, { "cell_type": "code", "execution_count": null, "id": "eOPu2wPXJnUD", "metadata": { "id": "eOPu2wPXJnUD", "tags": [] }, "outputs": [], "source": [ "class PrintOwnerPets(beam.DoFn):\n", " def process(self, owner):\n", " print(f'Owner: {owner.name}')\n", " for pet in owner.pets:\n", " print(f' Pet: {pet.name}, Type: {pet.pet_type}, Breed: {pet.breed}')\n", "\n", "# Beam pipeline\n", "with beam.Pipeline() as p:\n", " owner_collection = p | 'Create owners' >> beam.Create(owners)\n", " owner_collection | 'Print owners and pets' >> beam.ParDo(PrintOwnerPets())" ] }, { "cell_type": "markdown", "id": "easD1ivo5EGi", "metadata": { "id": "easD1ivo5EGi" }, "source": [ "## Number of Pets by Owner" ] }, { "cell_type": "code", "execution_count": null, "id": "aItiQwzqKlaO", "metadata": { "id": "aItiQwzqKlaO", "tags": [] }, "outputs": [], "source": [ "from typing import NamedTuple, List, Tuple\n", "\n", "class CountPetsDoFn(beam.DoFn):\n", " def process(self, owner) -> Tuple[str, int]:\n", " num_pets = len(owner.pets)\n", " yield (owner.name, num_pets)\n", "\n", "\n", "with beam.Pipeline() as p:\n", " owner_collection = p | 'Create owners' >> beam.Create(owners)\n", "\n", " pet_counts = (\n", " owner_collection\n", " | 'Count pets' >> beam.ParDo(CountPetsDoFn())\n", " )\n", "\n", " pet_counts | 'Print pet counts' >> beam.Map(print)" ] }, { "cell_type": "code", "execution_count": null, "id": "9d482121-891b-442b-8276-52ea6b0b1eaa", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "colab": { "name": "drehnstrom (Jul 12, 2024, 6:04:57 PM)", "provenance": [] }, "environment": { "kernel": "python3", "name": ".m116", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/:m116" }, "kernelspec": { "display_name": "Python 3 (ipykernel) (Local)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }