courses/DSL/dataflow-examples/apache-beam-schemas.ipynb (480 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "b6856ad0-1a3c-4ccf-81e9-5bc46b67b80f",
"metadata": {},
"source": [
"# Apache Beam Schemas"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "pnRH5srPn8LLdiGS2xfTN9Lz",
"metadata": {
"id": "pnRH5srPn8LLdiGS2xfTN9Lz",
"tags": []
},
"outputs": [],
"source": [
"! pip install apache_beam apache-beam[gcp] --quiet"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cRs_CxwZAhxC",
"metadata": {
"id": "cRs_CxwZAhxC"
},
"outputs": [],
"source": [
"import IPython\n",
"from IPython.display import display\n",
"\n",
"app = IPython.Application.instance()\n",
"app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"id": "SFrDA0iFHJCU",
"metadata": {
"id": "SFrDA0iFHJCU"
},
"source": [
"# Create the Pet schema and some test data"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9-XW6ngc_QZK",
"metadata": {
"id": "9-XW6ngc_QZK",
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from typing import NamedTuple\n",
"\n",
"class Pet(NamedTuple):\n",
" name: str\n",
" pet_type: str\n",
" breed: str\n",
"\n",
"# Creating 20 pets\n",
"pets = [\n",
" Pet(name=\"Buddy\", pet_type=\"Dog\", breed=\"Golden Retriever\"),\n",
" Pet(name=\"Mittens\", pet_type=\"Cat\", breed=\"Siamese\"),\n",
" Pet(name=\"Max\", pet_type=\"Dog\", breed=\"Beagle\"),\n",
" Pet(name=\"Bella\", pet_type=\"Dog\", breed=\"Labrador\"),\n",
" Pet(name=\"Charlie\", pet_type=\"Dog\", breed=\"Poodle\"),\n",
" Pet(name=\"Lucy\", pet_type=\"Cat\", breed=\"Persian\"),\n",
" Pet(name=\"Daisy\", pet_type=\"Dog\", breed=\"Bulldog\"),\n",
" Pet(name=\"Luna\", pet_type=\"Cat\", breed=\"Maine Coon\"),\n",
" Pet(name=\"Rocky\", pet_type=\"Dog\", breed=\"Rottweiler\"),\n",
" Pet(name=\"Lola\", pet_type=\"Cat\", breed=\"Bengal\"),\n",
" Pet(name=\"Jack\", pet_type=\"Dog\", breed=\"Boxer\"),\n",
" Pet(name=\"Nala\", pet_type=\"Cat\", breed=\"Ragdoll\"),\n",
" Pet(name=\"Zeus\", pet_type=\"Dog\", breed=\"German Shepherd\"),\n",
" Pet(name=\"Chloe\", pet_type=\"Cat\", breed=\"British Shorthair\"),\n",
" Pet(name=\"Buster\", pet_type=\"Dog\", breed=\"Dachshund\"),\n",
" Pet(name=\"Simba\", pet_type=\"Cat\", breed=\"Sphynx\"),\n",
" Pet(name=\"Cooper\", pet_type=\"Dog\", breed=\"Cocker Spaniel\"),\n",
" Pet(name=\"Sasha\", pet_type=\"Cat\", breed=\"Scottish Fold\"),\n",
" Pet(name=\"Milo\", pet_type=\"Dog\", breed=\"Shih Tzu\"),\n",
" Pet(name=\"Oreo\", pet_type=\"Cat\", breed=\"Abyssinian\")\n",
"]\n",
"\n",
"# Show the first 5 pets.\n",
"print(pets[:5])"
]
},
{
"cell_type": "markdown",
"id": "yXGue2i4HQw-",
"metadata": {
"id": "yXGue2i4HQw-"
},
"source": [
"## Simple Pipeline\n",
"\n",
"Create the pets and print them"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aQ6-j6Y7DSr5",
"metadata": {
"id": "aQ6-j6Y7DSr5",
"tags": []
},
"outputs": [],
"source": [
"# Beam pipeline\n",
"with beam.Pipeline() as p:\n",
" pet_collection = (\n",
" p\n",
" | 'Create pets' >> beam.Create(pets)\n",
" | 'Print pets' >> beam.Map(print)\n",
" )\n"
]
},
{
"cell_type": "markdown",
"id": "5zfHZ9fHHZKj",
"metadata": {
"id": "5zfHZ9fHHZKj"
},
"source": [
"## Use Filter() to return only the dogs"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ik0-kT__DbXf",
"metadata": {
"id": "ik0-kT__DbXf",
"tags": []
},
"outputs": [],
"source": [
"# Beam pipeline\n",
"with beam.Pipeline() as p:\n",
" pet_collection = (\n",
" p\n",
" | 'Create pets' >> beam.Create(pets)\n",
" | 'Get only the Dogs' >> beam.Filter(lambda pet: pet.pet_type == 'Dog')\n",
" | 'Print pets' >> beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "VUEWPXrB2zY1",
"metadata": {
"id": "VUEWPXrB2zY1"
},
"source": [
"## Filter with DoFn"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1D2y03ds22lz",
"metadata": {
"id": "1D2y03ds22lz",
"tags": []
},
"outputs": [],
"source": [
"class OnlyCats(beam.DoFn):\n",
" def process(self, pet):\n",
" if pet.pet_type == 'Cat':\n",
" yield pet\n",
"\n",
"\n",
"# Beam pipeline\n",
"with beam.Pipeline() as p:\n",
" pet_collection = (\n",
" p\n",
" | 'Create pets' >> beam.Create(pets)\n",
" | 'Get only the Dogs' >> beam.ParDo(OnlyCats())\n",
" | 'Print pets' >> beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "4AHQIKLZ3eIn",
"metadata": {
"id": "4AHQIKLZ3eIn"
},
"source": [
"## Filter with FlatMap()\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7LvJyyMG3kfW",
"metadata": {
"id": "7LvJyyMG3kfW",
"tags": []
},
"outputs": [],
"source": [
"def NoCatsFilter(pet):\n",
" if pet.pet_type == 'Dog':\n",
" return pet\n",
"\n",
"\n",
"# Beam pipeline\n",
"with beam.Pipeline() as p:\n",
" pet_collection = (\n",
" p\n",
" | 'Create pets' >> beam.Create(pets)\n",
" | 'Get only the Dogs' >> beam.FlatMap(NoCatsFilter)\n",
" | 'Print pets' >> beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "wULJaeRPHfSZ",
"metadata": {
"id": "wULJaeRPHfSZ"
},
"source": [
"# Branch the pets to return dogs and cats."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6ZSuZdRBFUCd",
"metadata": {
"id": "6ZSuZdRBFUCd",
"tags": []
},
"outputs": [],
"source": [
"class FilterPetsDoFn(beam.DoFn):\n",
" def process(self, pet):\n",
" if pet.pet_type == 'Dog':\n",
" yield beam.pvalue.TaggedOutput('dogs', pet)\n",
" elif pet.pet_type == 'Cat':\n",
" yield beam.pvalue.TaggedOutput('cats', pet)\n",
"\n",
"\n",
"with beam.Pipeline() as p:\n",
" pet_collection = p | 'Create pets' >> beam.Create(pets)\n",
"\n",
" filtered_pets = (\n",
" pet_collection\n",
" | 'Filter pets' >> beam.ParDo(FilterPetsDoFn()).with_outputs('dogs', 'cats')\n",
" )\n",
"\n",
" dogs = filtered_pets.dogs | 'Print dogs' >> beam.Map(print)\n",
" # cats = filtered_pets.cats | 'Print cats' >> beam.Map(print)"
]
},
{
"cell_type": "markdown",
"id": "o4Qd7TuTHvqc",
"metadata": {
"id": "o4Qd7TuTHvqc"
},
"source": [
"## Output the dogs and cats into seperate CSV files"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8aZEh4D-GF1p",
"metadata": {
"id": "8aZEh4D-GF1p",
"tags": []
},
"outputs": [],
"source": [
"class FilterPetsDoFn(beam.DoFn):\n",
" def process(self, pet):\n",
" if pet.pet_type == 'Dog':\n",
" yield beam.pvalue.TaggedOutput('dogs', pet)\n",
" elif pet.pet_type == 'Cat':\n",
" yield beam.pvalue.TaggedOutput('cats', pet)\n",
"\n",
"def format_csv(pet):\n",
" return f'{pet.name},{pet.pet_type},{pet.breed}'\n",
"\n",
"# Beam pipeline\n",
"with beam.Pipeline() as p:\n",
" pet_collection = p | 'Create pets' >> beam.Create(pets)\n",
"\n",
" filtered_pets = (\n",
" pet_collection\n",
" | 'Filter pets' >> beam.ParDo(FilterPetsDoFn()).with_outputs('dogs', 'cats')\n",
" )\n",
"\n",
" dog_collection = filtered_pets.dogs | 'Format dogs to CSV' >> beam.Map(format_csv)\n",
" cat_collection = filtered_pets.cats | 'Format cats to CSV' >> beam.Map(format_csv)\n",
"\n",
" dog_collection | 'Write dogs to CSV' >> beam.io.WriteToText('./temp_files/only_dogs', file_name_suffix='.csv')\n",
" cat_collection | 'Write cats to CSV' >> beam.io.WriteToText('./temp_files/only_cats', file_name_suffix='.csv')\n",
"\n",
"\n",
"\n",
"\n",
"! cat ./temp_files/only_dogs-00000-of-00001.csv\n",
"! cat ./temp_files/only_cats-00000-of-00001.csv"
]
},
{
"cell_type": "markdown",
"id": "uclsrxArIejz",
"metadata": {
"id": "uclsrxArIejz"
},
"source": [
"# Nested data - Owners and Pets"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "sLvIM_QbIZc1",
"metadata": {
"id": "sLvIM_QbIZc1",
"tags": []
},
"outputs": [],
"source": [
"import random\n",
"from ctypes import Array\n",
"import apache_beam as beam\n",
"from typing import NamedTuple, List\n",
"\n",
"class Pet(NamedTuple):\n",
" name: str\n",
" pet_type: str\n",
" breed: str\n",
"\n",
"class Owner(NamedTuple):\n",
" name: str\n",
" pets: List[Pet]\n",
"\n",
"# Sample pet data\n",
"pet_names = [\"Buddy\", \"Mittens\", \"Max\", \"Bella\", \"Charlie\", \"Lucy\", \"Daisy\", \"Luna\", \"Rocky\", \"Lola\", \"Jack\", \"Nala\", \"Zeus\", \"Chloe\", \"Buster\", \"Simba\", \"Cooper\", \"Sasha\", \"Milo\", \"Oreo\"]\n",
"pet_types = [\"Dog\", \"Cat\"]\n",
"dog_breeds = [\"Golden Retriever\", \"Beagle\", \"Labrador\", \"Poodle\", \"Bulldog\", \"Rottweiler\", \"Boxer\", \"German Shepherd\", \"Dachshund\", \"Cocker Spaniel\", \"Shih Tzu\"]\n",
"cat_breeds = [\"Siamese\", \"Persian\", \"Maine Coon\", \"Bengal\", \"Ragdoll\", \"British Shorthair\", \"Sphynx\", \"Scottish Fold\", \"Abyssinian\"]\n",
"\n",
"def random_pet() -> Pet:\n",
" pet_type = random.choice(pet_types)\n",
" breed = random.choice(dog_breeds if pet_type == \"Dog\" else cat_breeds)\n",
" name = random.choice(pet_names)\n",
" return Pet(name=name, pet_type=pet_type, breed=breed)\n",
"\n",
"def random_pets() -> List[Pet]:\n",
" return [random_pet() for _ in range(random.randint(1, 2))]\n",
"\n",
"# Sample owner data\n",
"owner_names = [\"Alice\", \"Bob\", \"Charlie\", \"Diana\", \"Doug\", \"Edward\", \"Fiona\", \"George\", \"Hannah\", \"Ivan\", \"Julia\", \"Michael\", \"Patrick\"]\n",
"\n",
"owners = [Owner(name=name, pets=random_pets()) for name in owner_names]\n",
"\n",
"# Just show one Owner\n",
"print(owners[0])"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "eOPu2wPXJnUD",
"metadata": {
"id": "eOPu2wPXJnUD",
"tags": []
},
"outputs": [],
"source": [
"class PrintOwnerPets(beam.DoFn):\n",
" def process(self, owner):\n",
" print(f'Owner: {owner.name}')\n",
" for pet in owner.pets:\n",
" print(f' Pet: {pet.name}, Type: {pet.pet_type}, Breed: {pet.breed}')\n",
"\n",
"# Beam pipeline\n",
"with beam.Pipeline() as p:\n",
" owner_collection = p | 'Create owners' >> beam.Create(owners)\n",
" owner_collection | 'Print owners and pets' >> beam.ParDo(PrintOwnerPets())"
]
},
{
"cell_type": "markdown",
"id": "easD1ivo5EGi",
"metadata": {
"id": "easD1ivo5EGi"
},
"source": [
"## Number of Pets by Owner"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "aItiQwzqKlaO",
"metadata": {
"id": "aItiQwzqKlaO",
"tags": []
},
"outputs": [],
"source": [
"from typing import NamedTuple, List, Tuple\n",
"\n",
"class CountPetsDoFn(beam.DoFn):\n",
" def process(self, owner) -> Tuple[str, int]:\n",
" num_pets = len(owner.pets)\n",
" yield (owner.name, num_pets)\n",
"\n",
"\n",
"with beam.Pipeline() as p:\n",
" owner_collection = p | 'Create owners' >> beam.Create(owners)\n",
"\n",
" pet_counts = (\n",
" owner_collection\n",
" | 'Count pets' >> beam.ParDo(CountPetsDoFn())\n",
" )\n",
"\n",
" pet_counts | 'Print pet counts' >> beam.Map(print)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9d482121-891b-442b-8276-52ea6b0b1eaa",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"colab": {
"name": "drehnstrom (Jul 12, 2024, 6:04:57 PM)",
"provenance": []
},
"environment": {
"kernel": "python3",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel) (Local)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}