courses/DSL/dataflow-examples/apache-beam-io.ipynb (620 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "4b32ec51-e06b-4a82-bf5d-611d8aa9787a",
"metadata": {},
"source": [
"# Apache Beam IO"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "pnRH5srPn8LLdiGS2xfTN9Lz",
"metadata": {
"id": "pnRH5srPn8LLdiGS2xfTN9Lz",
"tags": []
},
"outputs": [],
"source": [
"! pip install apache_beam apache-beam[gcp] --quiet"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cRs_CxwZAhxC",
"metadata": {
"id": "cRs_CxwZAhxC"
},
"outputs": [],
"source": [
"import IPython\n",
"from IPython.display import display\n",
"\n",
"app = IPython.Application.instance()\n",
"app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"id": "_NIlmBbfA89K",
"metadata": {
"id": "_NIlmBbfA89K"
},
"source": [
"# Create a File for Testing\n",
"\n",
"No big deal here. Just creating a file to read from."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "lBsk_N4hAuds",
"metadata": {
"id": "lBsk_N4hAuds",
"tags": []
},
"outputs": [],
"source": [
"%%bash\n",
"\n",
"filename=\"./temp_files/dogs.txt\"\n",
"\n",
"# Make sure the temp directory exists\n",
"mkdir temp_files\n",
"\n",
"# First make sure the file doesn't exist\n",
"rm $filename\n",
"\n",
"#Write the dog names to a file dogs.txt\n",
"for dog in Noir Bree Gigi Gretyl Duchess Rusty\n",
"do\n",
" echo $dog >> $filename\n",
"done\n",
"\n",
"# This is a great line of code :)\n",
"cat $filename"
]
},
{
"cell_type": "markdown",
"id": "SENNtHCMBG83",
"metadata": {
"id": "SENNtHCMBG83"
},
"source": [
"# Use Beam.IO to Read From a File"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "nbcW7QjHBLEe",
"metadata": {
"id": "nbcW7QjHBLEe",
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.io import ReadFromText, WriteToText\n",
"\n",
"filename=\"./temp_files/dogs.txt\"\n",
"with beam.Pipeline() as p:\n",
" (\n",
" p | 'Read' >> ReadFromText(filename)\n",
" | 'Transform' >> beam.Map(str.upper)\n",
" | 'Print' >> beam.Map(print)\n",
" )"
]
},
{
"cell_type": "markdown",
"id": "ocVuG5oLCNKi",
"metadata": {
"id": "ocVuG5oLCNKi"
},
"source": [
"# Use Beam.IO Write a File"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "wdenoi-5CIbn",
"metadata": {
"id": "wdenoi-5CIbn",
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.io import ReadFromText, WriteToText\n",
"\n",
"def makeUppercase(element):\n",
" return element.upper()\n",
"\n",
"filename=\"./temp_files/dogs.txt\"\n",
"with beam.Pipeline() as p:\n",
" (\n",
" p | 'Read' >> ReadFromText(filename)\n",
" | 'Transform' >> beam.Map(makeUppercase)\n",
" | 'Write' >> WriteToText('./temp_files/uppercase-dogs.out')\n",
" )\n",
"\n",
"\n",
"# Use ls to see if the file was created and\n",
"# cat to view the contents of the file.\n",
"!ls ./temp_files/uppercase-dogs.*\n",
"! cat ./temp_files/uppercase-dogs.out-00000-of-00001"
]
},
{
"cell_type": "markdown",
"id": "978790b4-8796-4270-b722-0ce9c8c40b25",
"metadata": {},
"source": [
"# Use Beam.IO Write to BigQuery"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ba165038-04f3-4a70-a0b6-9dd0e82294e5",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%bash\n",
"\n",
"# Make sure a Cloud Storage Bucket and BQ Dataset is created. \n",
"# The Bucket is used for Temp files. \n",
"# The Dataset is required to create the BQ table.\n",
"\n",
"project_id='dsl-dar'\n",
"dataset_id='beam_dataset'\n",
"table_id='dogs_table'\n",
"bucket_name='dataflow-temp-bucket-dar'\n",
"\n",
"bq mk --dataset $project_id:$dataset_id\n",
"\n",
"# Check if the Cloud Storage bucket exists\n",
"if gsutil ls -b gs://$bucket_name >/dev/null 2>&1; then\n",
" echo \"Bucket $bucket_name already exists.\"\n",
"else\n",
" gsutil mb --location=US gs://$bucket_name\n",
" echo \"Bucket $bucket_name created.\"\n",
"fi"
]
},
{
"cell_type": "markdown",
"id": "847452f2-fc75-4adf-bbd8-043d051c193f",
"metadata": {},
"source": [
"## Read from text file, transform, write to BigQuery"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cf74af25-e831-43da-bcc4-b491e142a66f",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.io import ReadFromText, WriteToBigQuery\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"\n",
"project_id = 'dsl-dar'\n",
"dataset_id = 'beam_dataset'\n",
"table_id = 'dogs_table'\n",
"bucket_name = 'dataflow-temp-bucket-dar'\n",
"\n",
"\n",
"def makeUppercase(element):\n",
" return {'dog_names': element.upper()}\n",
"\n",
"# Define the BigQuery table schema\n",
"table_schema = {\n",
" 'fields': [\n",
" {'name': 'dog_names', 'type': 'STRING', 'mode': 'NULLABLE'}\n",
" ]\n",
"}\n",
"\n",
"\n",
"filename=\"./temp_files/dogs.txt\"\n",
"\n",
"# Define the pipeline options\n",
"options = PipelineOptions(\n",
" project=project_id,\n",
" temp_location='gs://{0}/temp'.format(bucket_name)\n",
")\n",
"\n",
"with beam.Pipeline(options=options) as p:\n",
" (\n",
" p | 'Read' >> ReadFromText(filename)\n",
" | 'Transform' >> beam.Map(makeUppercase)\n",
" | 'WriteToBigQuery' >> WriteToBigQuery(\n",
" f'{project_id}:{dataset_id}.{table_id}',\n",
" schema=table_schema,\n",
" write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,\n",
" create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n",
" )\n",
" )\n",
" \n",
"# Note: Options for BigQueryDEisposistion include: \n",
"# WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY, CREATE_IF_NEEDED, CREATE_NEVER\n",
"\n",
"print(\"BigQuery Table Created\")"
]
},
{
"cell_type": "markdown",
"id": "32323663-2fb2-41cb-a530-74cd32102f2f",
"metadata": {},
"source": [
"# Use BigQuery IO Read to run a query"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "62704c66-7ecb-49cf-a440-95fd649ee2a9",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.io.gcp.bigquery import ReadFromBigQuery\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"\n",
"# Define the BigQuery query\n",
"query = 'SELECT * FROM `{}.{}.dogs_table`;'.format(project_id, dataset_id)\n",
"print(query)\n",
"\n",
"# Define the pipeline options\n",
"options = PipelineOptions(\n",
" project=project_id,\n",
" temp_location='gs://{0}/temp'.format(bucket_name)\n",
")\n",
"\n",
"def process_row(row):\n",
" # Process the row here if needed\n",
" print(row)\n",
" return row\n",
"\n",
"with beam.Pipeline(options=options) as p:\n",
" (\n",
" p | 'ReadFromBigQuery' >> ReadFromBigQuery(query=query,\n",
" use_standard_sql=True)\n",
" | 'ProcessRows' >> beam.Map(process_row)\n",
" )\n",
"\n",
"print(\"Done\")"
]
},
{
"cell_type": "markdown",
"id": "05131311-78b2-476b-af13-9bac4586e038",
"metadata": {},
"source": [
"# Let's make it harder using a couple tables Owners and their Pets. \n",
"\n",
"First, create some test record for Owners and Pets"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "69ceb20a-f3f9-4f91-8501-8c5f98c66596",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%bash\n",
"\n",
"# Define the variables\n",
"owners_file_name=\"./temp_files/owners-io.csv\"\n",
"pets_file_name=\"./temp_files/pets-io.csv\"\n",
"num_pets=200\n",
"num_owners=100\n",
"\n",
"# Make sure files don't exist\n",
"rm $owners_file_name\n",
"rm $pets_file_name\n",
"\n",
"###########################\n",
"# Create the Owners first #\n",
"###########################\n",
"\n",
"# Create the file and add the headers\n",
"echo \"id,first_name,last_name,phone\" > $owners_file_name\n",
"\n",
"# Define lists of first names and last names\n",
"first_names=(\"John\" \"Jane\" \"Alice\" \"Bob\" \"Carol\" \"David\" \"Eva\" \"Frank\" \"Grace\" \"Hank\")\n",
"last_names=(\"Smith\" \"Johnson\" \"Williams\" \"Brown\" \"Jones\" \"Garcia\" \"Miller\" \"Davis\" \"Rodriguez\" \"Martinez\")\n",
"\n",
"# Generate 10 test records\n",
"for (( i=1; i<=num_owners; i++ ))\n",
"do\n",
" # Randomly select a first name and last name\n",
" first_name=${first_names[$RANDOM % ${#first_names[@]}]}\n",
" last_name=${last_names[$RANDOM % ${#last_names[@]}]}\n",
" phone=$(printf \"555-%03d-%04d\\n\" $((RANDOM%1000)) $((RANDOM%10000)))\n",
"\n",
" # Add the record to the file\n",
" echo \"$i,$first_name,$last_name,$phone\" >> $owners_file_name\n",
"done\n",
"\n",
"\n",
"echo \"File '$owners_file_name' has been created with $num_owners test records.\"\n",
"head $owners_file_name\n",
"\n",
"###########################\n",
"# Create the Pets #\n",
"###########################\n",
"\n",
"# Create the file and add the headers\n",
"echo \"id,owner_id,pet_name,pet_type,breed,weight\" > $pets_file_name\n",
"\n",
"# Define lists of pet names, pet types, and breeds\n",
"pet_names=(\"Noir\" \"Bree\" \"Duke\" \"Joy\" \"Gigi\" \"Buddy\" \"Bella\" \"Charlie\" \"Max\" \"Luna\" \"Rocky\" \"Molly\" \"Daisy\" \"Bailey\" \"Sadie\" \"Oliver\" \"Coco\" \"Lucy\" \"Toby\" \"Chloe\" \"Jake\" \"Milo\" \"Lola\" \"Jack\" \"Nala\")\n",
"pet_types=(\"Dog\" \"Dog\" \"Dog\" \"Dog\" \"Cat\" \"Cat\" \"Cat\" \"Bird\" \"Fish\" \"Rabbit\") # More dogs and cats\n",
"dog_breeds=(\"Labrador\" \"German Shepherd\" \"Golden Retriever\" \"French Bulldog\" \"Poodle\")\n",
"cat_breeds=(\"Alley\" \"Siamese\" \"Maine Coon\" \"Persian\" \"Ragdoll\" \"Bengal\")\n",
"bird_breeds=(\"Parakeet\" \"Canary\" \"Finch\" \"Cockatiel\" \"Lovebird\")\n",
"fish_breeds=(\"Goldfish\" \"Betta\" \"Guppy\" \"Molly\" \"Tetra\")\n",
"rabbit_breeds=(\"Holland Lop\" \"Netherland Dwarf\" \"Lionhead\" \"Flemish Giant\" \"Mini Rex\")\n",
"\n",
"# Generate 200 test records\n",
"for (( i=1; i<=num_pets; i++ ))\n",
"do\n",
" # Randomly select a pet name, pet type, breed, and weight\n",
" pet_name=${pet_names[$RANDOM % ${#pet_names[@]}]}\n",
" pet_type=${pet_types[$RANDOM % ${#pet_types[@]}]}\n",
" owner_id=$((RANDOM % num_owners + 1))\n",
"\n",
"\n",
" case $pet_type in\n",
" \"Dog\")\n",
" breed=${dog_breeds[$RANDOM % ${#dog_breeds[@]}]}\n",
" weight=$((RANDOM % 40 + 10)) # Dogs typically weigh between 10 to 50 kg\n",
" ;;\n",
" \"Cat\")\n",
" breed=${cat_breeds[$RANDOM % ${#cat_breeds[@]}]}\n",
" weight=$((RANDOM % 8 + 3)) # Cats typically weigh between 3 to 10 kg\n",
" ;;\n",
" \"Bird\")\n",
" breed=${bird_breeds[$RANDOM % ${#bird_breeds[@]}]}\n",
" weight=$((RANDOM % 2 + 1)) # Birds typically weigh between 1 to 3 kg\n",
" ;;\n",
" \"Fish\")\n",
" breed=${fish_breeds[$RANDOM % ${#fish_breeds[@]}]}\n",
" weight=$((RANDOM % 2 + 1)) # Fish typically weigh between 1 to 3 kg\n",
" ;;\n",
" \"Rabbit\")\n",
" breed=${rabbit_breeds[$RANDOM % ${#rabbit_breeds[@]}]}\n",
" weight=$((RANDOM % 5 + 1)) # Rabbits typically weigh between 1 to 6 kg\n",
" ;;\n",
" esac\n",
"\n",
" # Add the record to the file\n",
" echo \"$i,$owner_id,$pet_name,$pet_type,$breed,$weight\" >> $pets_file_name\n",
"done\n",
"echo \"-------------------------------\"\n",
"echo \"File '$pets_file_name' has been created with $num_pets test records.\"\n",
"head $pets_file_name\n"
]
},
{
"cell_type": "markdown",
"id": "c1b32262-088b-44f2-a61f-e855901ee59d",
"metadata": {},
"source": [
"# Read the 2 CSV tables, Parse them, and then Write them to BigQuery"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d896f4af-1f2b-4044-ae96-b63ebba4a740",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions\n",
"from apache_beam.io.gcp.bigquery import WriteToBigQuery\n",
"import csv\n",
"\n",
"owners_table = \"./temp_files/owners-io.csv\"\n",
"pets_table = \"./temp_files/pets-io.csv\"\n",
"\n",
"# Define parsing functions\n",
"def parse_owners_csv(line):\n",
" fields = line.split(',')\n",
" return {\n",
" 'id': int(fields[0]),\n",
" 'first_name': fields[1],\n",
" 'last_name': fields[2],\n",
" 'phone': fields[3]\n",
" }\n",
"\n",
"def parse_pets_csv(line):\n",
" fields = line.split(',')\n",
" return {\n",
" 'id': int(fields[0]),\n",
" 'owner_id': int(fields[1]),\n",
" 'pet_name': fields[2],\n",
" 'pet_type': fields[3],\n",
" 'breed': fields[4],\n",
" 'weight': float(fields[5])\n",
" }\n",
"\n",
"# Define BigQuery schemas\n",
"owners_table_schema = {\n",
" 'fields': [\n",
" {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'first_name', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'last_name', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'phone', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" ]\n",
"}\n",
"\n",
"pets_table_schema = {\n",
" 'fields': [\n",
" {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'owner_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'pet_name', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'pet_type', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'breed', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'weight', 'type': 'FLOAT', 'mode': 'NULLABLE'},\n",
" ]\n",
"}\n",
"\n",
"# Define the pipeline options\n",
"options = PipelineOptions(\n",
" project=project_id,\n",
" temp_location='gs://{0}/temp'.format(bucket_name)\n",
")\n",
"\n",
"\n",
"# Create and run the pipeline\n",
"with beam.Pipeline(options=options) as p:\n",
" owners = (\n",
" p\n",
" | 'Read Owners CSV' >> beam.io.ReadFromText(owners_table, skip_header_lines=1)\n",
" | 'Parse Owners CSV' >> beam.Map(parse_owners_csv)\n",
"\n",
" )\n",
"\n",
" pets = (\n",
" p\n",
" | 'Read Pets CSV' >> beam.io.ReadFromText(pets_table, skip_header_lines=1)\n",
" | 'Parse Pets CSV' >> beam.Map(parse_pets_csv)\n",
" )\n",
" \n",
" owners | 'Write Owners to BigQuery' >> WriteToBigQuery(\n",
" f'{project_id}:{dataset_id}.owners_table',\n",
" schema=owners_table_schema,\n",
" write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,\n",
" create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n",
" )\n",
"\n",
" pets | 'Write Pets to BigQuery' >> WriteToBigQuery(\n",
" f'{project_id}:{dataset_id}.pets_table',\n",
" schema=pets_table_schema,\n",
" write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,\n",
" create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n",
" )\n",
"\n",
"\n",
"print(\"Done\")\n"
]
},
{
"cell_type": "markdown",
"id": "534febd6-1321-42b9-8809-fd1539f8c648",
"metadata": {},
"source": [
"# Now, let's run a Query"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "992d6e05-51d2-4013-aa95-d94c89e898bb",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.io.gcp.bigquery import ReadFromBigQuery\n",
"from apache_beam.options.pipeline_options import PipelineOptions\n",
"\n",
"# Define the BigQuery query\n",
"query = f\"\"\"\n",
"SELECT owners.id as owner_id, Concat(first_name,\" \", last_name) as owner, \n",
"ARRAY_AGG(STRUCT(pet_name, pet_type)) as pets\n",
"FROM `{project_id}.{dataset_id}.owners_table` owners\n",
"JOIN `{project_id}.{dataset_id}.pets_table` pets\n",
"on owners.id = pets.owner_id\n",
"GROUP BY owner_id, owner\n",
"ORDER BY owners.id;\n",
"\"\"\"\n",
"print(query)\n",
"\n",
"# Define the pipeline options\n",
"options = PipelineOptions(\n",
" project=project_id,\n",
" temp_location='gs://{0}/temp'.format(bucket_name)\n",
")\n",
"\n",
"def process_row(row):\n",
" # Process the row here if needed\n",
" print(row)\n",
" return row\n",
"\n",
"with beam.Pipeline(options=options) as p:\n",
" (\n",
" p | 'ReadFromBigQuery' >> ReadFromBigQuery(query=query,\n",
" use_standard_sql=True)\n",
" | 'ProcessRows' >> beam.Map(process_row)\n",
" )\n",
" \n",
" \n",
"print(\"done\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c2ba9245-606e-4bb5-a92e-2ab964dfb33c",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"colab": {
"name": "drehnstrom (Jul 12, 2024, 1:16:46 PM)",
"provenance": []
},
"environment": {
"kernel": "python3",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel) (Local)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}