courses/DSL/dataflow-examples/apache-beam-sql.ipynb (623 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "a4147d72-b719-4ca4-ba42-8d998c8c1b52",
"metadata": {},
"source": [
"# Apache Beam SQL"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "pnRH5srPn8LLdiGS2xfTN9Lz",
"metadata": {
"executionInfo": {
"elapsed": 12692,
"status": "ok",
"timestamp": 1720877608754,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 240
},
"id": "pnRH5srPn8LLdiGS2xfTN9Lz",
"tags": []
},
"outputs": [],
"source": [
"! pip install apache_beam apache-beam[gcp] apache-beam[interactive] --quiet"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cRs_CxwZAhxC",
"metadata": {
"id": "cRs_CxwZAhxC"
},
"outputs": [],
"source": [
"import IPython\n",
"from IPython.display import display\n",
"\n",
"app = IPython.Application.instance()\n",
"app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "Ydfy5aUDTZlH",
"metadata": {
"executionInfo": {
"elapsed": 141,
"status": "ok",
"timestamp": 1720878643793,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 240
},
"id": "Ydfy5aUDTZlH",
"tags": []
},
"outputs": [],
"source": [
"# Enable the Beam magics\n",
"from apache_beam.runners.interactive.sql.beam_sql_magics import BeamSqlMagics\n",
"from IPython import get_ipython\n",
"\n",
"ipython = get_ipython()\n",
"ipython.register_magics(BeamSqlMagics)"
]
},
{
"cell_type": "markdown",
"id": "C2pB_Uh2ENqc",
"metadata": {
"id": "C2pB_Uh2ENqc"
},
"source": [
"# Create test records\n",
"\n",
"Write owners and pets into CSV files."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "lBsk_N4hAuds",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
},
"executionInfo": {
"elapsed": 290,
"status": "ok",
"timestamp": 1720877613716,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 240
},
"id": "lBsk_N4hAuds",
"outputId": "910aca36-218a-4493-f8c9-073f9f93d191",
"tags": []
},
"outputs": [],
"source": [
"%%bash\n",
"\n",
"# Define the variables\n",
"owners_file_name=\"./temp_files/owners-io.csv\"\n",
"pets_file_name=\"./temp_files/pets-io.csv\"\n",
"num_pets=200\n",
"num_owners=100\n",
"\n",
"# Make sure files don't exist\n",
"rm $owners_file_name\n",
"rm $pets_file_name\n",
"\n",
"###########################\n",
"# Create the Owners first #\n",
"###########################\n",
"\n",
"# Create the file and add the headers\n",
"echo \"id,first_name,last_name,phone\" > $owners_file_name\n",
"\n",
"# Define lists of first names and last names\n",
"first_names=(\"John\" \"Jane\" \"Alice\" \"Bob\" \"Carol\" \"David\" \"Eva\" \"Frank\" \"Grace\" \"Hank\")\n",
"last_names=(\"Smith\" \"Johnson\" \"Williams\" \"Brown\" \"Jones\" \"Garcia\" \"Miller\" \"Davis\" \"Rodriguez\" \"Martinez\")\n",
"\n",
"# Generate 10 test records\n",
"for (( i=1; i<=num_owners; i++ ))\n",
"do\n",
" # Randomly select a first name and last name\n",
" first_name=${first_names[$RANDOM % ${#first_names[@]}]}\n",
" last_name=${last_names[$RANDOM % ${#last_names[@]}]}\n",
" phone=$(printf \"555-%03d-%04d\\n\" $((RANDOM%1000)) $((RANDOM%10000)))\n",
"\n",
" # Add the record to the file\n",
" echo \"$i,$first_name,$last_name,$phone\" >> $owners_file_name\n",
"done\n",
"\n",
"\n",
"echo \"File '$owners_file_name' has been created with $num_owners test records.\"\n",
"head $owners_file_name\n",
"\n",
"###########################\n",
"# Create the Pets #\n",
"###########################\n",
"\n",
"# Create the file and add the headers\n",
"echo \"id,owner_id,pet_name,pet_type,breed,weight\" > $pets_file_name\n",
"\n",
"# Define lists of pet names, pet types, and breeds\n",
"pet_names=(\"Noir\" \"Bree\" \"Duke\" \"Joy\" \"Gigi\" \"Buddy\" \"Bella\" \"Charlie\" \"Max\" \"Luna\" \"Rocky\" \"Molly\" \"Daisy\" \"Bailey\" \"Sadie\" \"Oliver\" \"Coco\" \"Lucy\" \"Toby\" \"Chloe\" \"Jake\" \"Milo\" \"Lola\" \"Jack\" \"Nala\")\n",
"pet_types=(\"Dog\" \"Dog\" \"Dog\" \"Dog\" \"Cat\" \"Cat\" \"Cat\" \"Bird\" \"Fish\" \"Rabbit\") # More dogs and cats\n",
"dog_breeds=(\"Labrador\" \"German Shepherd\" \"Golden Retriever\" \"French Bulldog\" \"Poodle\")\n",
"cat_breeds=(\"Alley\" \"Siamese\" \"Maine Coon\" \"Persian\" \"Ragdoll\" \"Bengal\")\n",
"bird_breeds=(\"Parakeet\" \"Canary\" \"Finch\" \"Cockatiel\" \"Lovebird\")\n",
"fish_breeds=(\"Goldfish\" \"Betta\" \"Guppy\" \"Molly\" \"Tetra\")\n",
"rabbit_breeds=(\"Holland Lop\" \"Netherland Dwarf\" \"Lionhead\" \"Flemish Giant\" \"Mini Rex\")\n",
"\n",
"# Generate 200 test records\n",
"for (( i=1; i<=num_pets; i++ ))\n",
"do\n",
" # Randomly select a pet name, pet type, breed, and weight\n",
" pet_name=${pet_names[$RANDOM % ${#pet_names[@]}]}\n",
" pet_type=${pet_types[$RANDOM % ${#pet_types[@]}]}\n",
" owner_id=$((RANDOM % num_owners + 1))\n",
"\n",
"\n",
" case $pet_type in\n",
" \"Dog\")\n",
" breed=${dog_breeds[$RANDOM % ${#dog_breeds[@]}]}\n",
" weight=$((RANDOM % 40 + 10)) # Dogs typically weigh between 10 to 50 kg\n",
" ;;\n",
" \"Cat\")\n",
" breed=${cat_breeds[$RANDOM % ${#cat_breeds[@]}]}\n",
" weight=$((RANDOM % 8 + 3)) # Cats typically weigh between 3 to 10 kg\n",
" ;;\n",
" \"Bird\")\n",
" breed=${bird_breeds[$RANDOM % ${#bird_breeds[@]}]}\n",
" weight=$((RANDOM % 2 + 1)) # Birds typically weigh between 1 to 3 kg\n",
" ;;\n",
" \"Fish\")\n",
" breed=${fish_breeds[$RANDOM % ${#fish_breeds[@]}]}\n",
" weight=$((RANDOM % 2 + 1)) # Fish typically weigh between 1 to 3 kg\n",
" ;;\n",
" \"Rabbit\")\n",
" breed=${rabbit_breeds[$RANDOM % ${#rabbit_breeds[@]}]}\n",
" weight=$((RANDOM % 5 + 1)) # Rabbits typically weigh between 1 to 6 kg\n",
" ;;\n",
" esac\n",
"\n",
" # Add the record to the file\n",
" echo \"$i,$owner_id,$pet_name,$pet_type,$breed,$weight\" >> $pets_file_name\n",
"done\n",
"echo \"-------------------------------\"\n",
"echo \"File '$pets_file_name' has been created with $num_pets test records.\"\n",
"head $pets_file_name"
]
},
{
"cell_type": "markdown",
"id": "5c0b7670-33f5-4a38-b4b9-d2a18490f819",
"metadata": {},
"source": [
"# Add the Schemas and Parsers"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c30ac4c9-1bec-4403-8662-79ad7e5ef052",
"metadata": {
"executionInfo": {
"elapsed": 168,
"status": "ok",
"timestamp": 1720879280823,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 240
},
"id": "Uv7bxbeuEH2o",
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from typing import NamedTuple\n",
"from apache_beam import coders\n",
"import csv\n",
"\n",
"class Pet(NamedTuple):\n",
" id: int\n",
" owner_id: int\n",
" pet_name: str\n",
" pet_type: str\n",
" breed: str\n",
" weight: int\n",
" \n",
"class Owner(NamedTuple):\n",
" id: int\n",
" first_name: str\n",
" last_name: str\n",
" phone: str\n",
"\n",
"\n",
"beam.coders.registry.register_coder(Pet, coders.RowCoder)\n",
"beam.coders.registry.register_coder(Owner, coders.RowCoder)\n",
"\n",
"def parse_pets_row(row):\n",
" for csv_row in csv.reader([row]):\n",
" return Pet(\n",
" id=int(csv_row[0]),\n",
" owner_id=int(csv_row[1]),\n",
" pet_name=csv_row[2],\n",
" pet_type=csv_row[3],\n",
" breed=csv_row[4],\n",
" weight=int(csv_row[5])\n",
" )\n",
" \n",
"def parse_owners_row(row):\n",
" for csv_row in csv.reader([row]):\n",
" return Owner(\n",
" id=int(csv_row[0]),\n",
" first_name=csv_row[1],\n",
" last_name=csv_row[2],\n",
" phone=csv_row[3]\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "9625dc40-9f91-4d45-9f96-2e89d3f9307a",
"metadata": {},
"source": [
"# Query just the Pets for now"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "BUniMBzPE-Em",
"metadata": {
"executionInfo": {
"elapsed": 330,
"status": "ok",
"timestamp": 1720880488874,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 240
},
"id": "BUniMBzPE-Em",
"tags": []
},
"outputs": [],
"source": [
"import csv\n",
"import apache_beam as beam\n",
"from apache_beam.runners.interactive.interactive_runner import InteractiveRunner\n",
"from apache_beam.runners.interactive import interactive_beam as ib\n",
"from typing import NamedTuple\n",
"\n",
"\n",
"# Create the Beam pipeline\n",
"pipeline = beam.Pipeline(runner=InteractiveRunner())\n",
"pets = (pipeline | 'Read CSV File' >> beam.io.ReadFromText('./temp_files/pets-io.csv', skip_header_lines=1)\n",
" | 'Parse CSV Rows' >> beam.Map(parse_pets_row).with_output_types(Pet)\n",
" )\n",
"\n",
"# Make the pipeline and PCollection interactive\n",
"ib.show(pets)"
]
},
{
"cell_type": "markdown",
"id": "cfb877e2-237f-4b68-98b5-77adbab40cf4",
"metadata": {},
"source": [
"# Run a Query to get the Dogs using the beam_sql Cell Magic decorator"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "KQ7j-9bJX8cv",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 1000
},
"executionInfo": {
"elapsed": 8317,
"status": "error",
"timestamp": 1720880468138,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 240
},
"id": "KQ7j-9bJX8cv",
"outputId": "248f135b-ad59-4c7b-dd1a-ec45a4637c9a",
"tags": []
},
"outputs": [],
"source": [
"%%beam_sql -o query_results_dogs\n",
"SELECT *\n",
"FROM pets\n",
"WHERE pet_type = 'Dog'\n"
]
},
{
"cell_type": "markdown",
"id": "d3051c40-5652-46f7-a353-8e47f950d07d",
"metadata": {},
"source": [
"# ib.collect() will convert the Query Results to a Pandas DataFrame\n",
"\n",
"Below, info() and to_csv() are DataFrame methods"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2c73cec4-5890-4a9a-9d6e-4330d64dbbbf",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Covnert Query Results to DataFrom\n",
"dogs = ib.collect(query_results_dogs)\n",
"\n",
"# Show information about the data\n",
"dogs.info()\n",
"\n",
"# Write the data to a file\n",
"csv = dogs.to_csv(\"./temp_files/dogs-results.csv\")"
]
},
{
"cell_type": "markdown",
"id": "03efe803-0c35-40ac-bacc-cf19b29f4a1e",
"metadata": {},
"source": [
"# Another query: Count the pets per owner"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "-N4b9SSTN99J",
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/",
"height": 1000
},
"executionInfo": {
"elapsed": 9474,
"status": "error",
"timestamp": 1720879790061,
"user": {
"displayName": "",
"userId": ""
},
"user_tz": 240
},
"id": "-N4b9SSTN99J",
"outputId": "e3267c40-b3e4-4724-cc9b-85a063305087",
"tags": []
},
"outputs": [],
"source": [
"%%beam_sql -o query_results_owner_pet_count\n",
"SELECT owner_id, count(id) as pet_count\n",
"FROM pets\n",
"GROUP BY owner_id"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f98d58c4-8888-4117-aeb1-2f93924288c0",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def convert_to_csv(row):\n",
" return f\"{row.owner_id}, {row.pet_count}\"\n",
"\n",
"def print_and_return(row):\n",
" print(row)\n",
" return row\n",
"\n",
"print(query_results_owner_pet_count)\n",
"\n",
"(query_results_owner_pet_count | \"Convert\" >> beam.Map(convert_to_csv)\n",
" | \"Print\" >> beam.Map(print_and_return)\n",
" | \"Write to File\" >> beam.io.textio.WriteToText('./temp_files/query_results_pet_count')\n",
") \n",
"\n",
"query_results_owner_pet_count.pipeline.run().wait_until_finish()"
]
},
{
"cell_type": "markdown",
"id": "cd022224-de57-434b-87b7-1699ea039f29",
"metadata": {},
"source": [
"# Run a Queries using the Beam SQLTransform\n",
"\n",
"At this point, just query a single table"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ZXxD9lp4GcV4",
"metadata": {
"id": "ZXxD9lp4GcV4",
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.transforms.sql import SqlTransform\n",
"\n",
"\n",
"with beam.Pipeline() as pipeline:\n",
" pets = (\n",
" pipeline\n",
" | 'Read CSV File' >> beam.io.ReadFromText('./temp_files/pets-io.csv', skip_header_lines=1)\n",
" | 'Parse CSV Rows' >> beam.Map(parse_pets_row).with_output_types(Pet)\n",
" )\n",
"\n",
" fish = (\n",
" pets\n",
" | 'Filter Dogs with SQL' >> SqlTransform(\n",
" \"\"\"\n",
" SELECT owner_id, pet_name\n",
" FROM PCOLLECTION pets\n",
" WHERE pet_type = 'Fish'\n",
" \"\"\"\n",
" )\n",
" | 'Print fish' >> beam.Map(print)\n",
" )\n",
" \n",
" \n",
" pet_counts = (\n",
" pets\n",
" | 'Count Pets by Owner' >> SqlTransform(\n",
" \"\"\"\n",
" SELECT owner_id, count(id) as pet_count\n",
" FROM PCOLLECTION pets\n",
" GROUP BY owner_id\n",
" \"\"\"\n",
" )\n",
" | 'Print pet counts' >> beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "fbb92bb5-f36b-464d-8891-d1fbbf987a11",
"metadata": {},
"source": [
"## Join two tables"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "95993ad0-f640-4d81-879b-17983058343b",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.transforms.sql import SqlTransform\n",
"\n",
"\n",
"def format_result(result):\n",
" # return f\"{result['first_name']}, {result['last_name']}, {result['pet_count']}\"\n",
" return f\"{result.first_name}, {result.last_name}, {result.pet_count}\"\n",
"\n",
"\n",
"with beam.Pipeline() as p:\n",
" pets = (\n",
" p\n",
" | 'ReadPets' >> beam.io.ReadFromText('./temp_files/pets-io.csv', skip_header_lines=1)\n",
" | 'ParsePets' >> beam.Map(parse_pets_row).with_output_types(Pet)\n",
" )\n",
"\n",
" owners = (\n",
" p\n",
" | 'ReadOwners' >> beam.io.ReadFromText('./temp_files/owners-io.csv', skip_header_lines=1)\n",
" | 'ParseOwners' >> beam.Map(parse_owners_row).with_output_types(Owner)\n",
" )\n",
" \n",
" # Define the SQL query\n",
" query = '''\n",
" SELECT\n",
" owners.first_name as first_name,\n",
" owners.last_name as last_name,\n",
" COUNT(pets.id) AS pet_count\n",
" FROM\n",
" owners\n",
" JOIN\n",
" pets\n",
" ON\n",
" owners.id = pets.owner_id\n",
" GROUP BY\n",
" owners.first_name,\n",
" owners.last_name\n",
" '''\n",
"\n",
" results = (\n",
" {'owners': owners, 'pets': pets}\n",
" | 'JoinAndCount' >> SqlTransform(query)\n",
" )\n",
" \n",
" formatted_results = (\n",
" results | 'format' >> beam.Map(format_result)\n",
" | 'WriteOutput' >> beam.io.WriteToText('./temp_files/joined-output.txt')\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c24c86b3-cf5d-497a-845b-39a1fe29ac89",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"colab": {
"name": "drehnstrom (Jul 13, 2024, 7:47:16 AM)",
"provenance": []
},
"environment": {
"kernel": "apache-beam-2.56.0",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Apache Beam 2.56.0 (Local)",
"language": "python",
"name": "apache-beam-2.56.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}