courses/DSL/dataflow-examples/apache-beam-sql.ipynb (623 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "a4147d72-b719-4ca4-ba42-8d998c8c1b52", "metadata": {}, "source": [ "# Apache Beam SQL" ] }, { "cell_type": "code", "execution_count": null, "id": "pnRH5srPn8LLdiGS2xfTN9Lz", "metadata": { "executionInfo": { "elapsed": 12692, "status": "ok", "timestamp": 1720877608754, "user": { "displayName": "", "userId": "" }, "user_tz": 240 }, "id": "pnRH5srPn8LLdiGS2xfTN9Lz", "tags": [] }, "outputs": [], "source": [ "! pip install apache_beam apache-beam[gcp] apache-beam[interactive] --quiet" ] }, { "cell_type": "code", "execution_count": null, "id": "cRs_CxwZAhxC", "metadata": { "id": "cRs_CxwZAhxC" }, "outputs": [], "source": [ "import IPython\n", "from IPython.display import display\n", "\n", "app = IPython.Application.instance()\n", "app.kernel.do_shutdown(True)" ] }, { "cell_type": "code", "execution_count": null, "id": "Ydfy5aUDTZlH", "metadata": { "executionInfo": { "elapsed": 141, "status": "ok", "timestamp": 1720878643793, "user": { "displayName": "", "userId": "" }, "user_tz": 240 }, "id": "Ydfy5aUDTZlH", "tags": [] }, "outputs": [], "source": [ "# Enable the Beam magics\n", "from apache_beam.runners.interactive.sql.beam_sql_magics import BeamSqlMagics\n", "from IPython import get_ipython\n", "\n", "ipython = get_ipython()\n", "ipython.register_magics(BeamSqlMagics)" ] }, { "cell_type": "markdown", "id": "C2pB_Uh2ENqc", "metadata": { "id": "C2pB_Uh2ENqc" }, "source": [ "# Create test records\n", "\n", "Write owners and pets into CSV files." ] }, { "cell_type": "code", "execution_count": null, "id": "lBsk_N4hAuds", "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "executionInfo": { "elapsed": 290, "status": "ok", "timestamp": 1720877613716, "user": { "displayName": "", "userId": "" }, "user_tz": 240 }, "id": "lBsk_N4hAuds", "outputId": "910aca36-218a-4493-f8c9-073f9f93d191", "tags": [] }, "outputs": [], "source": [ "%%bash\n", "\n", "# Define the variables\n", "owners_file_name=\"./temp_files/owners-io.csv\"\n", "pets_file_name=\"./temp_files/pets-io.csv\"\n", "num_pets=200\n", "num_owners=100\n", "\n", "# Make sure files don't exist\n", "rm $owners_file_name\n", "rm $pets_file_name\n", "\n", "###########################\n", "# Create the Owners first #\n", "###########################\n", "\n", "# Create the file and add the headers\n", "echo \"id,first_name,last_name,phone\" > $owners_file_name\n", "\n", "# Define lists of first names and last names\n", "first_names=(\"John\" \"Jane\" \"Alice\" \"Bob\" \"Carol\" \"David\" \"Eva\" \"Frank\" \"Grace\" \"Hank\")\n", "last_names=(\"Smith\" \"Johnson\" \"Williams\" \"Brown\" \"Jones\" \"Garcia\" \"Miller\" \"Davis\" \"Rodriguez\" \"Martinez\")\n", "\n", "# Generate 10 test records\n", "for (( i=1; i<=num_owners; i++ ))\n", "do\n", " # Randomly select a first name and last name\n", " first_name=${first_names[$RANDOM % ${#first_names[@]}]}\n", " last_name=${last_names[$RANDOM % ${#last_names[@]}]}\n", " phone=$(printf \"555-%03d-%04d\\n\" $((RANDOM%1000)) $((RANDOM%10000)))\n", "\n", " # Add the record to the file\n", " echo \"$i,$first_name,$last_name,$phone\" >> $owners_file_name\n", "done\n", "\n", "\n", "echo \"File '$owners_file_name' has been created with $num_owners test records.\"\n", "head $owners_file_name\n", "\n", "###########################\n", "# Create the Pets #\n", "###########################\n", "\n", "# Create the file and add the headers\n", "echo \"id,owner_id,pet_name,pet_type,breed,weight\" > $pets_file_name\n", "\n", "# Define lists of pet names, pet types, and breeds\n", "pet_names=(\"Noir\" \"Bree\" \"Duke\" \"Joy\" \"Gigi\" \"Buddy\" \"Bella\" \"Charlie\" \"Max\" \"Luna\" \"Rocky\" \"Molly\" \"Daisy\" \"Bailey\" \"Sadie\" \"Oliver\" \"Coco\" \"Lucy\" \"Toby\" \"Chloe\" \"Jake\" \"Milo\" \"Lola\" \"Jack\" \"Nala\")\n", "pet_types=(\"Dog\" \"Dog\" \"Dog\" \"Dog\" \"Cat\" \"Cat\" \"Cat\" \"Bird\" \"Fish\" \"Rabbit\") # More dogs and cats\n", "dog_breeds=(\"Labrador\" \"German Shepherd\" \"Golden Retriever\" \"French Bulldog\" \"Poodle\")\n", "cat_breeds=(\"Alley\" \"Siamese\" \"Maine Coon\" \"Persian\" \"Ragdoll\" \"Bengal\")\n", "bird_breeds=(\"Parakeet\" \"Canary\" \"Finch\" \"Cockatiel\" \"Lovebird\")\n", "fish_breeds=(\"Goldfish\" \"Betta\" \"Guppy\" \"Molly\" \"Tetra\")\n", "rabbit_breeds=(\"Holland Lop\" \"Netherland Dwarf\" \"Lionhead\" \"Flemish Giant\" \"Mini Rex\")\n", "\n", "# Generate 200 test records\n", "for (( i=1; i<=num_pets; i++ ))\n", "do\n", " # Randomly select a pet name, pet type, breed, and weight\n", " pet_name=${pet_names[$RANDOM % ${#pet_names[@]}]}\n", " pet_type=${pet_types[$RANDOM % ${#pet_types[@]}]}\n", " owner_id=$((RANDOM % num_owners + 1))\n", "\n", "\n", " case $pet_type in\n", " \"Dog\")\n", " breed=${dog_breeds[$RANDOM % ${#dog_breeds[@]}]}\n", " weight=$((RANDOM % 40 + 10)) # Dogs typically weigh between 10 to 50 kg\n", " ;;\n", " \"Cat\")\n", " breed=${cat_breeds[$RANDOM % ${#cat_breeds[@]}]}\n", " weight=$((RANDOM % 8 + 3)) # Cats typically weigh between 3 to 10 kg\n", " ;;\n", " \"Bird\")\n", " breed=${bird_breeds[$RANDOM % ${#bird_breeds[@]}]}\n", " weight=$((RANDOM % 2 + 1)) # Birds typically weigh between 1 to 3 kg\n", " ;;\n", " \"Fish\")\n", " breed=${fish_breeds[$RANDOM % ${#fish_breeds[@]}]}\n", " weight=$((RANDOM % 2 + 1)) # Fish typically weigh between 1 to 3 kg\n", " ;;\n", " \"Rabbit\")\n", " breed=${rabbit_breeds[$RANDOM % ${#rabbit_breeds[@]}]}\n", " weight=$((RANDOM % 5 + 1)) # Rabbits typically weigh between 1 to 6 kg\n", " ;;\n", " esac\n", "\n", " # Add the record to the file\n", " echo \"$i,$owner_id,$pet_name,$pet_type,$breed,$weight\" >> $pets_file_name\n", "done\n", "echo \"-------------------------------\"\n", "echo \"File '$pets_file_name' has been created with $num_pets test records.\"\n", "head $pets_file_name" ] }, { "cell_type": "markdown", "id": "5c0b7670-33f5-4a38-b4b9-d2a18490f819", "metadata": {}, "source": [ "# Add the Schemas and Parsers" ] }, { "cell_type": "code", "execution_count": null, "id": "c30ac4c9-1bec-4403-8662-79ad7e5ef052", "metadata": { "executionInfo": { "elapsed": 168, "status": "ok", "timestamp": 1720879280823, "user": { "displayName": "", "userId": "" }, "user_tz": 240 }, "id": "Uv7bxbeuEH2o", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from typing import NamedTuple\n", "from apache_beam import coders\n", "import csv\n", "\n", "class Pet(NamedTuple):\n", " id: int\n", " owner_id: int\n", " pet_name: str\n", " pet_type: str\n", " breed: str\n", " weight: int\n", " \n", "class Owner(NamedTuple):\n", " id: int\n", " first_name: str\n", " last_name: str\n", " phone: str\n", "\n", "\n", "beam.coders.registry.register_coder(Pet, coders.RowCoder)\n", "beam.coders.registry.register_coder(Owner, coders.RowCoder)\n", "\n", "def parse_pets_row(row):\n", " for csv_row in csv.reader([row]):\n", " return Pet(\n", " id=int(csv_row[0]),\n", " owner_id=int(csv_row[1]),\n", " pet_name=csv_row[2],\n", " pet_type=csv_row[3],\n", " breed=csv_row[4],\n", " weight=int(csv_row[5])\n", " )\n", " \n", "def parse_owners_row(row):\n", " for csv_row in csv.reader([row]):\n", " return Owner(\n", " id=int(csv_row[0]),\n", " first_name=csv_row[1],\n", " last_name=csv_row[2],\n", " phone=csv_row[3]\n", " )" ] }, { "cell_type": "markdown", "id": "9625dc40-9f91-4d45-9f96-2e89d3f9307a", "metadata": {}, "source": [ "# Query just the Pets for now" ] }, { "cell_type": "code", "execution_count": null, "id": "BUniMBzPE-Em", "metadata": { "executionInfo": { "elapsed": 330, "status": "ok", "timestamp": 1720880488874, "user": { "displayName": "", "userId": "" }, "user_tz": 240 }, "id": "BUniMBzPE-Em", "tags": [] }, "outputs": [], "source": [ "import csv\n", "import apache_beam as beam\n", "from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n", "from apache_beam.runners.interactive import interactive_beam as ib\n", "from typing import NamedTuple\n", "\n", "\n", "# Create the Beam pipeline\n", "pipeline = beam.Pipeline(runner=InteractiveRunner())\n", "pets = (pipeline | 'Read CSV File' >> beam.io.ReadFromText('./temp_files/pets-io.csv', skip_header_lines=1)\n", " | 'Parse CSV Rows' >> beam.Map(parse_pets_row).with_output_types(Pet)\n", " )\n", "\n", "# Make the pipeline and PCollection interactive\n", "ib.show(pets)" ] }, { "cell_type": "markdown", "id": "cfb877e2-237f-4b68-98b5-77adbab40cf4", "metadata": {}, "source": [ "# Run a Query to get the Dogs using the beam_sql Cell Magic decorator" ] }, { "cell_type": "code", "execution_count": null, "id": "KQ7j-9bJX8cv", "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 1000 }, "executionInfo": { "elapsed": 8317, "status": "error", "timestamp": 1720880468138, "user": { "displayName": "", "userId": "" }, "user_tz": 240 }, "id": "KQ7j-9bJX8cv", "outputId": "248f135b-ad59-4c7b-dd1a-ec45a4637c9a", "tags": [] }, "outputs": [], "source": [ "%%beam_sql -o query_results_dogs\n", "SELECT *\n", "FROM pets\n", "WHERE pet_type = 'Dog'\n" ] }, { "cell_type": "markdown", "id": "d3051c40-5652-46f7-a353-8e47f950d07d", "metadata": {}, "source": [ "# ib.collect() will convert the Query Results to a Pandas DataFrame\n", "\n", "Below, info() and to_csv() are DataFrame methods" ] }, { "cell_type": "code", "execution_count": null, "id": "2c73cec4-5890-4a9a-9d6e-4330d64dbbbf", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Covnert Query Results to DataFrom\n", "dogs = ib.collect(query_results_dogs)\n", "\n", "# Show information about the data\n", "dogs.info()\n", "\n", "# Write the data to a file\n", "csv = dogs.to_csv(\"./temp_files/dogs-results.csv\")" ] }, { "cell_type": "markdown", "id": "03efe803-0c35-40ac-bacc-cf19b29f4a1e", "metadata": {}, "source": [ "# Another query: Count the pets per owner" ] }, { "cell_type": "code", "execution_count": null, "id": "-N4b9SSTN99J", "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 1000 }, "executionInfo": { "elapsed": 9474, "status": "error", "timestamp": 1720879790061, "user": { "displayName": "", "userId": "" }, "user_tz": 240 }, "id": "-N4b9SSTN99J", "outputId": "e3267c40-b3e4-4724-cc9b-85a063305087", "tags": [] }, "outputs": [], "source": [ "%%beam_sql -o query_results_owner_pet_count\n", "SELECT owner_id, count(id) as pet_count\n", "FROM pets\n", "GROUP BY owner_id" ] }, { "cell_type": "code", "execution_count": null, "id": "f98d58c4-8888-4117-aeb1-2f93924288c0", "metadata": { "tags": [] }, "outputs": [], "source": [ "def convert_to_csv(row):\n", " return f\"{row.owner_id}, {row.pet_count}\"\n", "\n", "def print_and_return(row):\n", " print(row)\n", " return row\n", "\n", "print(query_results_owner_pet_count)\n", "\n", "(query_results_owner_pet_count | \"Convert\" >> beam.Map(convert_to_csv)\n", " | \"Print\" >> beam.Map(print_and_return)\n", " | \"Write to File\" >> beam.io.textio.WriteToText('./temp_files/query_results_pet_count')\n", ") \n", "\n", "query_results_owner_pet_count.pipeline.run().wait_until_finish()" ] }, { "cell_type": "markdown", "id": "cd022224-de57-434b-87b7-1699ea039f29", "metadata": {}, "source": [ "# Run a Queries using the Beam SQLTransform\n", "\n", "At this point, just query a single table" ] }, { "cell_type": "code", "execution_count": null, "id": "ZXxD9lp4GcV4", "metadata": { "id": "ZXxD9lp4GcV4", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.transforms.sql import SqlTransform\n", "\n", "\n", "with beam.Pipeline() as pipeline:\n", " pets = (\n", " pipeline\n", " | 'Read CSV File' >> beam.io.ReadFromText('./temp_files/pets-io.csv', skip_header_lines=1)\n", " | 'Parse CSV Rows' >> beam.Map(parse_pets_row).with_output_types(Pet)\n", " )\n", "\n", " fish = (\n", " pets\n", " | 'Filter Dogs with SQL' >> SqlTransform(\n", " \"\"\"\n", " SELECT owner_id, pet_name\n", " FROM PCOLLECTION pets\n", " WHERE pet_type = 'Fish'\n", " \"\"\"\n", " )\n", " | 'Print fish' >> beam.Map(print)\n", " )\n", " \n", " \n", " pet_counts = (\n", " pets\n", " | 'Count Pets by Owner' >> SqlTransform(\n", " \"\"\"\n", " SELECT owner_id, count(id) as pet_count\n", " FROM PCOLLECTION pets\n", " GROUP BY owner_id\n", " \"\"\"\n", " )\n", " | 'Print pet counts' >> beam.Map(print)\n", " )" ] }, { "cell_type": "markdown", "id": "fbb92bb5-f36b-464d-8891-d1fbbf987a11", "metadata": {}, "source": [ "## Join two tables" ] }, { "cell_type": "code", "execution_count": null, "id": "95993ad0-f640-4d81-879b-17983058343b", "metadata": { "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.transforms.sql import SqlTransform\n", "\n", "\n", "def format_result(result):\n", " # return f\"{result['first_name']}, {result['last_name']}, {result['pet_count']}\"\n", " return f\"{result.first_name}, {result.last_name}, {result.pet_count}\"\n", "\n", "\n", "with beam.Pipeline() as p:\n", " pets = (\n", " p\n", " | 'ReadPets' >> beam.io.ReadFromText('./temp_files/pets-io.csv', skip_header_lines=1)\n", " | 'ParsePets' >> beam.Map(parse_pets_row).with_output_types(Pet)\n", " )\n", "\n", " owners = (\n", " p\n", " | 'ReadOwners' >> beam.io.ReadFromText('./temp_files/owners-io.csv', skip_header_lines=1)\n", " | 'ParseOwners' >> beam.Map(parse_owners_row).with_output_types(Owner)\n", " )\n", " \n", " # Define the SQL query\n", " query = '''\n", " SELECT\n", " owners.first_name as first_name,\n", " owners.last_name as last_name,\n", " COUNT(pets.id) AS pet_count\n", " FROM\n", " owners\n", " JOIN\n", " pets\n", " ON\n", " owners.id = pets.owner_id\n", " GROUP BY\n", " owners.first_name,\n", " owners.last_name\n", " '''\n", "\n", " results = (\n", " {'owners': owners, 'pets': pets}\n", " | 'JoinAndCount' >> SqlTransform(query)\n", " )\n", " \n", " formatted_results = (\n", " results | 'format' >> beam.Map(format_result)\n", " | 'WriteOutput' >> beam.io.WriteToText('./temp_files/joined-output.txt')\n", " )" ] }, { "cell_type": "code", "execution_count": null, "id": "c24c86b3-cf5d-497a-845b-39a1fe29ac89", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "colab": { "name": "drehnstrom (Jul 13, 2024, 7:47:16 AM)", "provenance": [] }, "environment": { "kernel": "apache-beam-2.56.0", "name": ".m116", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/:m116" }, "kernelspec": { "display_name": "Apache Beam 2.56.0 (Local)", "language": "python", "name": "apache-beam-2.56.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }