courses/DSL/dataflow-examples/apache-beam-io.ipynb (620 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "4b32ec51-e06b-4a82-bf5d-611d8aa9787a", "metadata": {}, "source": [ "# Apache Beam IO" ] }, { "cell_type": "code", "execution_count": null, "id": "pnRH5srPn8LLdiGS2xfTN9Lz", "metadata": { "id": "pnRH5srPn8LLdiGS2xfTN9Lz", "tags": [] }, "outputs": [], "source": [ "! pip install apache_beam apache-beam[gcp] --quiet" ] }, { "cell_type": "code", "execution_count": null, "id": "cRs_CxwZAhxC", "metadata": { "id": "cRs_CxwZAhxC" }, "outputs": [], "source": [ "import IPython\n", "from IPython.display import display\n", "\n", "app = IPython.Application.instance()\n", "app.kernel.do_shutdown(True)" ] }, { "cell_type": "markdown", "id": "_NIlmBbfA89K", "metadata": { "id": "_NIlmBbfA89K" }, "source": [ "# Create a File for Testing\n", "\n", "No big deal here. Just creating a file to read from." ] }, { "cell_type": "code", "execution_count": null, "id": "lBsk_N4hAuds", "metadata": { "id": "lBsk_N4hAuds", "tags": [] }, "outputs": [], "source": [ "%%bash\n", "\n", "filename=\"./temp_files/dogs.txt\"\n", "\n", "# Make sure the temp directory exists\n", "mkdir temp_files\n", "\n", "# First make sure the file doesn't exist\n", "rm $filename\n", "\n", "#Write the dog names to a file dogs.txt\n", "for dog in Noir Bree Gigi Gretyl Duchess Rusty\n", "do\n", " echo $dog >> $filename\n", "done\n", "\n", "# This is a great line of code :)\n", "cat $filename" ] }, { "cell_type": "markdown", "id": "SENNtHCMBG83", "metadata": { "id": "SENNtHCMBG83" }, "source": [ "# Use Beam.IO to Read From a File" ] }, { "cell_type": "code", "execution_count": null, "id": "nbcW7QjHBLEe", "metadata": { "id": "nbcW7QjHBLEe", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.io import ReadFromText, WriteToText\n", "\n", "filename=\"./temp_files/dogs.txt\"\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Transform' >> beam.Map(str.upper)\n", " | 'Print' >> beam.Map(print)\n", " )" ] }, { "cell_type": "markdown", "id": "ocVuG5oLCNKi", "metadata": { "id": "ocVuG5oLCNKi" }, "source": [ "# Use Beam.IO Write a File" ] }, { "cell_type": "code", "execution_count": null, "id": "wdenoi-5CIbn", "metadata": { "id": "wdenoi-5CIbn", "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.io import ReadFromText, WriteToText\n", "\n", "def makeUppercase(element):\n", " return element.upper()\n", "\n", "filename=\"./temp_files/dogs.txt\"\n", "with beam.Pipeline() as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Transform' >> beam.Map(makeUppercase)\n", " | 'Write' >> WriteToText('./temp_files/uppercase-dogs.out')\n", " )\n", "\n", "\n", "# Use ls to see if the file was created and\n", "# cat to view the contents of the file.\n", "!ls ./temp_files/uppercase-dogs.*\n", "! cat ./temp_files/uppercase-dogs.out-00000-of-00001" ] }, { "cell_type": "markdown", "id": "978790b4-8796-4270-b722-0ce9c8c40b25", "metadata": {}, "source": [ "# Use Beam.IO Write to BigQuery" ] }, { "cell_type": "code", "execution_count": null, "id": "ba165038-04f3-4a70-a0b6-9dd0e82294e5", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%bash\n", "\n", "# Make sure a Cloud Storage Bucket and BQ Dataset is created. \n", "# The Bucket is used for Temp files. \n", "# The Dataset is required to create the BQ table.\n", "\n", "project_id='dsl-dar'\n", "dataset_id='beam_dataset'\n", "table_id='dogs_table'\n", "bucket_name='dataflow-temp-bucket-dar'\n", "\n", "bq mk --dataset $project_id:$dataset_id\n", "\n", "# Check if the Cloud Storage bucket exists\n", "if gsutil ls -b gs://$bucket_name >/dev/null 2>&1; then\n", " echo \"Bucket $bucket_name already exists.\"\n", "else\n", " gsutil mb --location=US gs://$bucket_name\n", " echo \"Bucket $bucket_name created.\"\n", "fi" ] }, { "cell_type": "markdown", "id": "847452f2-fc75-4adf-bbd8-043d051c193f", "metadata": {}, "source": [ "## Read from text file, transform, write to BigQuery" ] }, { "cell_type": "code", "execution_count": null, "id": "cf74af25-e831-43da-bcc4-b491e142a66f", "metadata": { "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.io import ReadFromText, WriteToBigQuery\n", "from apache_beam.options.pipeline_options import PipelineOptions\n", "\n", "project_id = 'dsl-dar'\n", "dataset_id = 'beam_dataset'\n", "table_id = 'dogs_table'\n", "bucket_name = 'dataflow-temp-bucket-dar'\n", "\n", "\n", "def makeUppercase(element):\n", " return {'dog_names': element.upper()}\n", "\n", "# Define the BigQuery table schema\n", "table_schema = {\n", " 'fields': [\n", " {'name': 'dog_names', 'type': 'STRING', 'mode': 'NULLABLE'}\n", " ]\n", "}\n", "\n", "\n", "filename=\"./temp_files/dogs.txt\"\n", "\n", "# Define the pipeline options\n", "options = PipelineOptions(\n", " project=project_id,\n", " temp_location='gs://{0}/temp'.format(bucket_name)\n", ")\n", "\n", "with beam.Pipeline(options=options) as p:\n", " (\n", " p | 'Read' >> ReadFromText(filename)\n", " | 'Transform' >> beam.Map(makeUppercase)\n", " | 'WriteToBigQuery' >> WriteToBigQuery(\n", " f'{project_id}:{dataset_id}.{table_id}',\n", " schema=table_schema,\n", " write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,\n", " create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n", " )\n", " )\n", " \n", "# Note: Options for BigQueryDEisposistion include: \n", "# WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY, CREATE_IF_NEEDED, CREATE_NEVER\n", "\n", "print(\"BigQuery Table Created\")" ] }, { "cell_type": "markdown", "id": "32323663-2fb2-41cb-a530-74cd32102f2f", "metadata": {}, "source": [ "# Use BigQuery IO Read to run a query" ] }, { "cell_type": "code", "execution_count": null, "id": "62704c66-7ecb-49cf-a440-95fd649ee2a9", "metadata": { "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.io.gcp.bigquery import ReadFromBigQuery\n", "from apache_beam.options.pipeline_options import PipelineOptions\n", "\n", "# Define the BigQuery query\n", "query = 'SELECT * FROM `{}.{}.dogs_table`;'.format(project_id, dataset_id)\n", "print(query)\n", "\n", "# Define the pipeline options\n", "options = PipelineOptions(\n", " project=project_id,\n", " temp_location='gs://{0}/temp'.format(bucket_name)\n", ")\n", "\n", "def process_row(row):\n", " # Process the row here if needed\n", " print(row)\n", " return row\n", "\n", "with beam.Pipeline(options=options) as p:\n", " (\n", " p | 'ReadFromBigQuery' >> ReadFromBigQuery(query=query,\n", " use_standard_sql=True)\n", " | 'ProcessRows' >> beam.Map(process_row)\n", " )\n", "\n", "print(\"Done\")" ] }, { "cell_type": "markdown", "id": "05131311-78b2-476b-af13-9bac4586e038", "metadata": {}, "source": [ "# Let's make it harder using a couple tables Owners and their Pets. \n", "\n", "First, create some test record for Owners and Pets" ] }, { "cell_type": "code", "execution_count": null, "id": "69ceb20a-f3f9-4f91-8501-8c5f98c66596", "metadata": { "tags": [] }, "outputs": [], "source": [ "%%bash\n", "\n", "# Define the variables\n", "owners_file_name=\"./temp_files/owners-io.csv\"\n", "pets_file_name=\"./temp_files/pets-io.csv\"\n", "num_pets=200\n", "num_owners=100\n", "\n", "# Make sure files don't exist\n", "rm $owners_file_name\n", "rm $pets_file_name\n", "\n", "###########################\n", "# Create the Owners first #\n", "###########################\n", "\n", "# Create the file and add the headers\n", "echo \"id,first_name,last_name,phone\" > $owners_file_name\n", "\n", "# Define lists of first names and last names\n", "first_names=(\"John\" \"Jane\" \"Alice\" \"Bob\" \"Carol\" \"David\" \"Eva\" \"Frank\" \"Grace\" \"Hank\")\n", "last_names=(\"Smith\" \"Johnson\" \"Williams\" \"Brown\" \"Jones\" \"Garcia\" \"Miller\" \"Davis\" \"Rodriguez\" \"Martinez\")\n", "\n", "# Generate 10 test records\n", "for (( i=1; i<=num_owners; i++ ))\n", "do\n", " # Randomly select a first name and last name\n", " first_name=${first_names[$RANDOM % ${#first_names[@]}]}\n", " last_name=${last_names[$RANDOM % ${#last_names[@]}]}\n", " phone=$(printf \"555-%03d-%04d\\n\" $((RANDOM%1000)) $((RANDOM%10000)))\n", "\n", " # Add the record to the file\n", " echo \"$i,$first_name,$last_name,$phone\" >> $owners_file_name\n", "done\n", "\n", "\n", "echo \"File '$owners_file_name' has been created with $num_owners test records.\"\n", "head $owners_file_name\n", "\n", "###########################\n", "# Create the Pets #\n", "###########################\n", "\n", "# Create the file and add the headers\n", "echo \"id,owner_id,pet_name,pet_type,breed,weight\" > $pets_file_name\n", "\n", "# Define lists of pet names, pet types, and breeds\n", "pet_names=(\"Noir\" \"Bree\" \"Duke\" \"Joy\" \"Gigi\" \"Buddy\" \"Bella\" \"Charlie\" \"Max\" \"Luna\" \"Rocky\" \"Molly\" \"Daisy\" \"Bailey\" \"Sadie\" \"Oliver\" \"Coco\" \"Lucy\" \"Toby\" \"Chloe\" \"Jake\" \"Milo\" \"Lola\" \"Jack\" \"Nala\")\n", "pet_types=(\"Dog\" \"Dog\" \"Dog\" \"Dog\" \"Cat\" \"Cat\" \"Cat\" \"Bird\" \"Fish\" \"Rabbit\") # More dogs and cats\n", "dog_breeds=(\"Labrador\" \"German Shepherd\" \"Golden Retriever\" \"French Bulldog\" \"Poodle\")\n", "cat_breeds=(\"Alley\" \"Siamese\" \"Maine Coon\" \"Persian\" \"Ragdoll\" \"Bengal\")\n", "bird_breeds=(\"Parakeet\" \"Canary\" \"Finch\" \"Cockatiel\" \"Lovebird\")\n", "fish_breeds=(\"Goldfish\" \"Betta\" \"Guppy\" \"Molly\" \"Tetra\")\n", "rabbit_breeds=(\"Holland Lop\" \"Netherland Dwarf\" \"Lionhead\" \"Flemish Giant\" \"Mini Rex\")\n", "\n", "# Generate 200 test records\n", "for (( i=1; i<=num_pets; i++ ))\n", "do\n", " # Randomly select a pet name, pet type, breed, and weight\n", " pet_name=${pet_names[$RANDOM % ${#pet_names[@]}]}\n", " pet_type=${pet_types[$RANDOM % ${#pet_types[@]}]}\n", " owner_id=$((RANDOM % num_owners + 1))\n", "\n", "\n", " case $pet_type in\n", " \"Dog\")\n", " breed=${dog_breeds[$RANDOM % ${#dog_breeds[@]}]}\n", " weight=$((RANDOM % 40 + 10)) # Dogs typically weigh between 10 to 50 kg\n", " ;;\n", " \"Cat\")\n", " breed=${cat_breeds[$RANDOM % ${#cat_breeds[@]}]}\n", " weight=$((RANDOM % 8 + 3)) # Cats typically weigh between 3 to 10 kg\n", " ;;\n", " \"Bird\")\n", " breed=${bird_breeds[$RANDOM % ${#bird_breeds[@]}]}\n", " weight=$((RANDOM % 2 + 1)) # Birds typically weigh between 1 to 3 kg\n", " ;;\n", " \"Fish\")\n", " breed=${fish_breeds[$RANDOM % ${#fish_breeds[@]}]}\n", " weight=$((RANDOM % 2 + 1)) # Fish typically weigh between 1 to 3 kg\n", " ;;\n", " \"Rabbit\")\n", " breed=${rabbit_breeds[$RANDOM % ${#rabbit_breeds[@]}]}\n", " weight=$((RANDOM % 5 + 1)) # Rabbits typically weigh between 1 to 6 kg\n", " ;;\n", " esac\n", "\n", " # Add the record to the file\n", " echo \"$i,$owner_id,$pet_name,$pet_type,$breed,$weight\" >> $pets_file_name\n", "done\n", "echo \"-------------------------------\"\n", "echo \"File '$pets_file_name' has been created with $num_pets test records.\"\n", "head $pets_file_name\n" ] }, { "cell_type": "markdown", "id": "c1b32262-088b-44f2-a61f-e855901ee59d", "metadata": {}, "source": [ "# Read the 2 CSV tables, Parse them, and then Write them to BigQuery" ] }, { "cell_type": "code", "execution_count": null, "id": "d896f4af-1f2b-4044-ae96-b63ebba4a740", "metadata": { "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions\n", "from apache_beam.io.gcp.bigquery import WriteToBigQuery\n", "import csv\n", "\n", "owners_table = \"./temp_files/owners-io.csv\"\n", "pets_table = \"./temp_files/pets-io.csv\"\n", "\n", "# Define parsing functions\n", "def parse_owners_csv(line):\n", " fields = line.split(',')\n", " return {\n", " 'id': int(fields[0]),\n", " 'first_name': fields[1],\n", " 'last_name': fields[2],\n", " 'phone': fields[3]\n", " }\n", "\n", "def parse_pets_csv(line):\n", " fields = line.split(',')\n", " return {\n", " 'id': int(fields[0]),\n", " 'owner_id': int(fields[1]),\n", " 'pet_name': fields[2],\n", " 'pet_type': fields[3],\n", " 'breed': fields[4],\n", " 'weight': float(fields[5])\n", " }\n", "\n", "# Define BigQuery schemas\n", "owners_table_schema = {\n", " 'fields': [\n", " {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n", " {'name': 'first_name', 'type': 'STRING', 'mode': 'NULLABLE'},\n", " {'name': 'last_name', 'type': 'STRING', 'mode': 'NULLABLE'},\n", " {'name': 'phone', 'type': 'STRING', 'mode': 'NULLABLE'},\n", " ]\n", "}\n", "\n", "pets_table_schema = {\n", " 'fields': [\n", " {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n", " {'name': 'owner_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n", " {'name': 'pet_name', 'type': 'STRING', 'mode': 'NULLABLE'},\n", " {'name': 'pet_type', 'type': 'STRING', 'mode': 'NULLABLE'},\n", " {'name': 'breed', 'type': 'STRING', 'mode': 'NULLABLE'},\n", " {'name': 'weight', 'type': 'FLOAT', 'mode': 'NULLABLE'},\n", " ]\n", "}\n", "\n", "# Define the pipeline options\n", "options = PipelineOptions(\n", " project=project_id,\n", " temp_location='gs://{0}/temp'.format(bucket_name)\n", ")\n", "\n", "\n", "# Create and run the pipeline\n", "with beam.Pipeline(options=options) as p:\n", " owners = (\n", " p\n", " | 'Read Owners CSV' >> beam.io.ReadFromText(owners_table, skip_header_lines=1)\n", " | 'Parse Owners CSV' >> beam.Map(parse_owners_csv)\n", "\n", " )\n", "\n", " pets = (\n", " p\n", " | 'Read Pets CSV' >> beam.io.ReadFromText(pets_table, skip_header_lines=1)\n", " | 'Parse Pets CSV' >> beam.Map(parse_pets_csv)\n", " )\n", " \n", " owners | 'Write Owners to BigQuery' >> WriteToBigQuery(\n", " f'{project_id}:{dataset_id}.owners_table',\n", " schema=owners_table_schema,\n", " write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,\n", " create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n", " )\n", "\n", " pets | 'Write Pets to BigQuery' >> WriteToBigQuery(\n", " f'{project_id}:{dataset_id}.pets_table',\n", " schema=pets_table_schema,\n", " write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,\n", " create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n", " )\n", "\n", "\n", "print(\"Done\")\n" ] }, { "cell_type": "markdown", "id": "534febd6-1321-42b9-8809-fd1539f8c648", "metadata": {}, "source": [ "# Now, let's run a Query" ] }, { "cell_type": "code", "execution_count": null, "id": "992d6e05-51d2-4013-aa95-d94c89e898bb", "metadata": { "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.io.gcp.bigquery import ReadFromBigQuery\n", "from apache_beam.options.pipeline_options import PipelineOptions\n", "\n", "# Define the BigQuery query\n", "query = f\"\"\"\n", "SELECT owners.id as owner_id, Concat(first_name,\" \", last_name) as owner, \n", "ARRAY_AGG(STRUCT(pet_name, pet_type)) as pets\n", "FROM `{project_id}.{dataset_id}.owners_table` owners\n", "JOIN `{project_id}.{dataset_id}.pets_table` pets\n", "on owners.id = pets.owner_id\n", "GROUP BY owner_id, owner\n", "ORDER BY owners.id;\n", "\"\"\"\n", "print(query)\n", "\n", "# Define the pipeline options\n", "options = PipelineOptions(\n", " project=project_id,\n", " temp_location='gs://{0}/temp'.format(bucket_name)\n", ")\n", "\n", "def process_row(row):\n", " # Process the row here if needed\n", " print(row)\n", " return row\n", "\n", "with beam.Pipeline(options=options) as p:\n", " (\n", " p | 'ReadFromBigQuery' >> ReadFromBigQuery(query=query,\n", " use_standard_sql=True)\n", " | 'ProcessRows' >> beam.Map(process_row)\n", " )\n", " \n", " \n", "print(\"done\")" ] }, { "cell_type": "code", "execution_count": null, "id": "c2ba9245-606e-4bb5-a92e-2ab964dfb33c", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "colab": { "name": "drehnstrom (Jul 12, 2024, 1:16:46 PM)", "provenance": [] }, "environment": { "kernel": "python3", "name": ".m116", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/:m116" }, "kernelspec": { "display_name": "Python 3 (ipykernel) (Local)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }