# Apache Beam IO

In [None]:
! pip install apache_beam apache-beam[gcp] --quiet

In [None]:
import IPython
from IPython.display import display

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

# Create a File for Testing

No big deal here. Just creating a file to read from.

In [None]:
%%bash

filename="./temp_files/dogs.txt"

# Make sure the temp directory exists
mkdir temp_files

# First make sure the file doesn't exist
rm $filename

#Write the dog names to a file dogs.txt
for dog in Noir Bree Gigi Gretyl Duchess Rusty
do
  echo $dog >> $filename
done

# This is a great line of code :)
cat $filename

# Use Beam.IO to Read From a File

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

filename="./temp_files/dogs.txt"
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Transform' >> beam.Map(str.upper)
          | 'Print' >> beam.Map(print)
    )

# Use Beam.IO Write a File

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

def makeUppercase(element):
    return element.upper()

filename="./temp_files/dogs.txt"
with beam.Pipeline() as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Transform' >> beam.Map(makeUppercase)
          | 'Write' >> WriteToText('./temp_files/uppercase-dogs.out')
    )


# Use ls to see if the file was created and
# cat to view the contents of the file.
!ls ./temp_files/uppercase-dogs.*
! cat ./temp_files/uppercase-dogs.out-00000-of-00001

# Use Beam.IO Write to BigQuery

In [None]:
%%bash

# Make sure a Cloud Storage Bucket and BQ Dataset is created. 
# The Bucket is used for Temp files. 
# The Dataset is required to create the BQ table.

project_id='dsl-dar'
dataset_id='beam_dataset'
table_id='dogs_table'
bucket_name='dataflow-temp-bucket-dar'

bq mk --dataset $project_id:$dataset_id

# Check if the Cloud Storage bucket exists
if gsutil ls -b gs://$bucket_name >/dev/null 2>&1; then
  echo "Bucket $bucket_name already exists."
else
  gsutil mb --location=US gs://$bucket_name
  echo "Bucket $bucket_name created."
fi

## Read from text file, transform, write to BigQuery

In [None]:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions

project_id = 'dsl-dar'
dataset_id = 'beam_dataset'
table_id = 'dogs_table'
bucket_name = 'dataflow-temp-bucket-dar'


def makeUppercase(element):
    return {'dog_names': element.upper()}

# Define the BigQuery table schema
table_schema = {
    'fields': [
        {'name': 'dog_names', 'type': 'STRING', 'mode': 'NULLABLE'}
    ]
}


filename="./temp_files/dogs.txt"

# Define the pipeline options
options = PipelineOptions(
    project=project_id,
    temp_location='gs://{0}/temp'.format(bucket_name)
)

with beam.Pipeline(options=options) as p:
    (
        p | 'Read' >> ReadFromText(filename)
          | 'Transform' >> beam.Map(makeUppercase)
          | 'WriteToBigQuery' >> WriteToBigQuery(
                f'{project_id}:{dataset_id}.{table_id}',
                schema=table_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
    )
    
# Note: Options for BigQueryDEisposistion include: 
# WRITE_APPEND, WRITE_TRUNCATE, WRITE_EMPTY, CREATE_IF_NEEDED, CREATE_NEVER

print("BigQuery Table Created")

# Use BigQuery IO Read to run a query

In [None]:
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.options.pipeline_options import PipelineOptions

# Define the BigQuery query
query = 'SELECT * FROM `{}.{}.dogs_table`;'.format(project_id, dataset_id)
print(query)

# Define the pipeline options
options = PipelineOptions(
    project=project_id,
    temp_location='gs://{0}/temp'.format(bucket_name)
)

def process_row(row):
    # Process the row here if needed
    print(row)
    return row

with beam.Pipeline(options=options) as p:
    (
        p | 'ReadFromBigQuery' >> ReadFromBigQuery(query=query,
                    use_standard_sql=True)
          | 'ProcessRows' >> beam.Map(process_row)
    )

print("Done")

# Let's make it harder using a couple tables Owners and their Pets. 

First, create some test record for Owners and Pets

In [None]:
%%bash

# Define the variables
owners_file_name="./temp_files/owners-io.csv"
pets_file_name="./temp_files/pets-io.csv"
num_pets=200
num_owners=100

# Make sure files don't exist
rm $owners_file_name
rm $pets_file_name

###########################
# Create the Owners first #
###########################

# Create the file and add the headers
echo "id,first_name,last_name,phone" > $owners_file_name

# Define lists of first names and last names
first_names=("John" "Jane" "Alice" "Bob" "Carol" "David" "Eva" "Frank" "Grace" "Hank")
last_names=("Smith" "Johnson" "Williams" "Brown" "Jones" "Garcia" "Miller" "Davis" "Rodriguez" "Martinez")

# Generate 10 test records
for (( i=1; i<=num_owners; i++ ))
do
  # Randomly select a first name and last name
  first_name=${first_names[$RANDOM % ${#first_names[@]}]}
  last_name=${last_names[$RANDOM % ${#last_names[@]}]}
  phone=$(printf "555-%03d-%04d\n" $((RANDOM%1000)) $((RANDOM%10000)))

  # Add the record to the file
  echo "$i,$first_name,$last_name,$phone" >> $owners_file_name
done


echo "File '$owners_file_name' has been created with $num_owners test records."
head $owners_file_name

###########################
#    Create the Pets      #
###########################

# Create the file and add the headers
echo "id,owner_id,pet_name,pet_type,breed,weight" > $pets_file_name

# Define lists of pet names, pet types, and breeds
pet_names=("Noir" "Bree" "Duke" "Joy" "Gigi" "Buddy" "Bella" "Charlie" "Max" "Luna" "Rocky" "Molly" "Daisy" "Bailey" "Sadie" "Oliver" "Coco" "Lucy" "Toby" "Chloe" "Jake" "Milo" "Lola" "Jack" "Nala")
pet_types=("Dog" "Dog" "Dog" "Dog" "Cat" "Cat" "Cat" "Bird" "Fish" "Rabbit") # More dogs and cats
dog_breeds=("Labrador" "German Shepherd" "Golden Retriever" "French Bulldog" "Poodle")
cat_breeds=("Alley" "Siamese" "Maine Coon" "Persian" "Ragdoll" "Bengal")
bird_breeds=("Parakeet" "Canary" "Finch" "Cockatiel" "Lovebird")
fish_breeds=("Goldfish" "Betta" "Guppy" "Molly" "Tetra")
rabbit_breeds=("Holland Lop" "Netherland Dwarf" "Lionhead" "Flemish Giant" "Mini Rex")

# Generate 200 test records
for (( i=1; i<=num_pets; i++ ))
do
  # Randomly select a pet name, pet type, breed, and weight
  pet_name=${pet_names[$RANDOM % ${#pet_names[@]}]}
  pet_type=${pet_types[$RANDOM % ${#pet_types[@]}]}
  owner_id=$((RANDOM % num_owners + 1))


    case $pet_type in
    "Dog")
      breed=${dog_breeds[$RANDOM % ${#dog_breeds[@]}]}
      weight=$((RANDOM % 40 + 10))  # Dogs typically weigh between 10 to 50 kg
      ;;
    "Cat")
      breed=${cat_breeds[$RANDOM % ${#cat_breeds[@]}]}
      weight=$((RANDOM % 8 + 3))    # Cats typically weigh between 3 to 10 kg
      ;;
    "Bird")
      breed=${bird_breeds[$RANDOM % ${#bird_breeds[@]}]}
      weight=$((RANDOM % 2 + 1))    # Birds typically weigh between 1 to 3 kg
      ;;
    "Fish")
      breed=${fish_breeds[$RANDOM % ${#fish_breeds[@]}]}
      weight=$((RANDOM % 2 + 1))    # Fish typically weigh between 1 to 3 kg
      ;;
    "Rabbit")
      breed=${rabbit_breeds[$RANDOM % ${#rabbit_breeds[@]}]}
      weight=$((RANDOM % 5 + 1))    # Rabbits typically weigh between 1 to 6 kg
      ;;
  esac

  # Add the record to the file
  echo "$i,$owner_id,$pet_name,$pet_type,$breed,$weight" >> $pets_file_name
done
echo "-------------------------------"
echo "File '$pets_file_name' has been created with $num_pets test records."
head $pets_file_name


# Read the 2 CSV tables, Parse them, and then Write them to BigQuery

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import csv

owners_table = "./temp_files/owners-io.csv"
pets_table = "./temp_files/pets-io.csv"

# Define parsing functions
def parse_owners_csv(line):
    fields = line.split(',')
    return {
        'id': int(fields[0]),
        'first_name': fields[1],
        'last_name': fields[2],
        'phone': fields[3]
    }

def parse_pets_csv(line):
    fields = line.split(',')
    return {
        'id': int(fields[0]),
        'owner_id': int(fields[1]),
        'pet_name': fields[2],
        'pet_type': fields[3],
        'breed': fields[4],
        'weight': float(fields[5])
    }

# Define BigQuery schemas
owners_table_schema = {
    'fields': [
        {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'first_name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'last_name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'phone', 'type': 'STRING', 'mode': 'NULLABLE'},
    ]
}

pets_table_schema = {
    'fields': [
        {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'owner_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'pet_name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'pet_type', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'breed', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'weight', 'type': 'FLOAT', 'mode': 'NULLABLE'},
    ]
}

# Define the pipeline options
options = PipelineOptions(
    project=project_id,
    temp_location='gs://{0}/temp'.format(bucket_name)
)


# Create and run the pipeline
with beam.Pipeline(options=options) as p:
    owners = (
        p
        | 'Read Owners CSV' >> beam.io.ReadFromText(owners_table, skip_header_lines=1)
        | 'Parse Owners CSV' >> beam.Map(parse_owners_csv)

    )

    pets = (
        p
        | 'Read Pets CSV' >> beam.io.ReadFromText(pets_table, skip_header_lines=1)
        | 'Parse Pets CSV' >> beam.Map(parse_pets_csv)
    )
    
    owners | 'Write Owners to BigQuery' >> WriteToBigQuery(
        f'{project_id}:{dataset_id}.owners_table',
        schema=owners_table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )

    pets | 'Write Pets to BigQuery' >> WriteToBigQuery(
        f'{project_id}:{dataset_id}.pets_table',
        schema=pets_table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )


print("Done")


# Now, let's run a Query

In [None]:
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.options.pipeline_options import PipelineOptions

# Define the BigQuery query
query = f"""
SELECT owners.id as owner_id, Concat(first_name," ", last_name) as owner, 
ARRAY_AGG(STRUCT(pet_name, pet_type)) as pets
FROM `{project_id}.{dataset_id}.owners_table` owners
JOIN `{project_id}.{dataset_id}.pets_table` pets
on owners.id = pets.owner_id
GROUP BY owner_id, owner
ORDER BY owners.id;
"""
print(query)

# Define the pipeline options
options = PipelineOptions(
    project=project_id,
    temp_location='gs://{0}/temp'.format(bucket_name)
)

def process_row(row):
    # Process the row here if needed
    print(row)
    return row

with beam.Pipeline(options=options) as p:
    (
        p | 'ReadFromBigQuery' >> ReadFromBigQuery(query=query,
                    use_standard_sql=True)
          | 'ProcessRows' >> beam.Map(process_row)
    )
    
    
print("done")