# Apache Beam SQL

In [None]:
! pip install apache_beam apache-beam[gcp] apache-beam[interactive] --quiet

In [None]:
import IPython
from IPython.display import display

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

In [None]:
# Enable the Beam magics
from apache_beam.runners.interactive.sql.beam_sql_magics import BeamSqlMagics
from IPython import get_ipython

ipython = get_ipython()
ipython.register_magics(BeamSqlMagics)

# Create test records

Write owners and pets into CSV files.

In [None]:
%%bash

# Define the variables
owners_file_name="./temp_files/owners-io.csv"
pets_file_name="./temp_files/pets-io.csv"
num_pets=200
num_owners=100

# Make sure files don't exist
rm $owners_file_name
rm $pets_file_name

###########################
# Create the Owners first #
###########################

# Create the file and add the headers
echo "id,first_name,last_name,phone" > $owners_file_name

# Define lists of first names and last names
first_names=("John" "Jane" "Alice" "Bob" "Carol" "David" "Eva" "Frank" "Grace" "Hank")
last_names=("Smith" "Johnson" "Williams" "Brown" "Jones" "Garcia" "Miller" "Davis" "Rodriguez" "Martinez")

# Generate 10 test records
for (( i=1; i<=num_owners; i++ ))
do
  # Randomly select a first name and last name
  first_name=${first_names[$RANDOM % ${#first_names[@]}]}
  last_name=${last_names[$RANDOM % ${#last_names[@]}]}
  phone=$(printf "555-%03d-%04d\n" $((RANDOM%1000)) $((RANDOM%10000)))

  # Add the record to the file
  echo "$i,$first_name,$last_name,$phone" >> $owners_file_name
done


echo "File '$owners_file_name' has been created with $num_owners test records."
head $owners_file_name

###########################
#    Create the Pets      #
###########################

# Create the file and add the headers
echo "id,owner_id,pet_name,pet_type,breed,weight" > $pets_file_name

# Define lists of pet names, pet types, and breeds
pet_names=("Noir" "Bree" "Duke" "Joy" "Gigi" "Buddy" "Bella" "Charlie" "Max" "Luna" "Rocky" "Molly" "Daisy" "Bailey" "Sadie" "Oliver" "Coco" "Lucy" "Toby" "Chloe" "Jake" "Milo" "Lola" "Jack" "Nala")
pet_types=("Dog" "Dog" "Dog" "Dog" "Cat" "Cat" "Cat" "Bird" "Fish" "Rabbit") # More dogs and cats
dog_breeds=("Labrador" "German Shepherd" "Golden Retriever" "French Bulldog" "Poodle")
cat_breeds=("Alley" "Siamese" "Maine Coon" "Persian" "Ragdoll" "Bengal")
bird_breeds=("Parakeet" "Canary" "Finch" "Cockatiel" "Lovebird")
fish_breeds=("Goldfish" "Betta" "Guppy" "Molly" "Tetra")
rabbit_breeds=("Holland Lop" "Netherland Dwarf" "Lionhead" "Flemish Giant" "Mini Rex")

# Generate 200 test records
for (( i=1; i<=num_pets; i++ ))
do
  # Randomly select a pet name, pet type, breed, and weight
  pet_name=${pet_names[$RANDOM % ${#pet_names[@]}]}
  pet_type=${pet_types[$RANDOM % ${#pet_types[@]}]}
  owner_id=$((RANDOM % num_owners + 1))


    case $pet_type in
    "Dog")
      breed=${dog_breeds[$RANDOM % ${#dog_breeds[@]}]}
      weight=$((RANDOM % 40 + 10))  # Dogs typically weigh between 10 to 50 kg
      ;;
    "Cat")
      breed=${cat_breeds[$RANDOM % ${#cat_breeds[@]}]}
      weight=$((RANDOM % 8 + 3))    # Cats typically weigh between 3 to 10 kg
      ;;
    "Bird")
      breed=${bird_breeds[$RANDOM % ${#bird_breeds[@]}]}
      weight=$((RANDOM % 2 + 1))    # Birds typically weigh between 1 to 3 kg
      ;;
    "Fish")
      breed=${fish_breeds[$RANDOM % ${#fish_breeds[@]}]}
      weight=$((RANDOM % 2 + 1))    # Fish typically weigh between 1 to 3 kg
      ;;
    "Rabbit")
      breed=${rabbit_breeds[$RANDOM % ${#rabbit_breeds[@]}]}
      weight=$((RANDOM % 5 + 1))    # Rabbits typically weigh between 1 to 6 kg
      ;;
  esac

  # Add the record to the file
  echo "$i,$owner_id,$pet_name,$pet_type,$breed,$weight" >> $pets_file_name
done
echo "-------------------------------"
echo "File '$pets_file_name' has been created with $num_pets test records."
head $pets_file_name

# Add the Schemas and Parsers

In [None]:
import apache_beam as beam
from typing import NamedTuple
from apache_beam import coders
import csv

class Pet(NamedTuple):
    id: int
    owner_id: int
    pet_name: str
    pet_type: str
    breed: str
    weight: int
    
class Owner(NamedTuple):
    id: int
    first_name: str
    last_name: str
    phone: str


beam.coders.registry.register_coder(Pet, coders.RowCoder)
beam.coders.registry.register_coder(Owner, coders.RowCoder)

def parse_pets_row(row):
    for csv_row in csv.reader([row]):
        return Pet(
            id=int(csv_row[0]),
            owner_id=int(csv_row[1]),
            pet_name=csv_row[2],
            pet_type=csv_row[3],
            breed=csv_row[4],
            weight=int(csv_row[5])
        )
    
def parse_owners_row(row):
    for csv_row in csv.reader([row]):
        return Owner(
            id=int(csv_row[0]),
            first_name=csv_row[1],
            last_name=csv_row[2],
            phone=csv_row[3]
        )

# Query just the Pets for now

In [None]:
import csv
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.interactive import interactive_beam as ib
from typing import NamedTuple


# Create the Beam pipeline
pipeline = beam.Pipeline(runner=InteractiveRunner())
pets = (pipeline | 'Read CSV File' >> beam.io.ReadFromText('./temp_files/pets-io.csv', skip_header_lines=1)
                 | 'Parse CSV Rows' >> beam.Map(parse_pets_row).with_output_types(Pet)
        )

# Make the pipeline and PCollection interactive
ib.show(pets)

# Run a Query to get the Dogs using the beam_sql Cell Magic decorator

In [None]:
%%beam_sql -o query_results_dogs
SELECT *
FROM pets
WHERE pet_type = 'Dog'


# ib.collect() will convert the Query Results to a Pandas DataFrame

Below, info() and to_csv() are DataFrame methods

In [None]:
# Covnert Query Results to DataFrom
dogs = ib.collect(query_results_dogs)

# Show information about the data
dogs.info()

# Write the data to a file
csv = dogs.to_csv("./temp_files/dogs-results.csv")

# Another query: Count the pets per owner

In [None]:
%%beam_sql -o query_results_owner_pet_count
SELECT owner_id, count(id) as pet_count
FROM pets
GROUP BY owner_id

In [None]:
def convert_to_csv(row):
    return f"{row.owner_id}, {row.pet_count}"

def print_and_return(row):
    print(row)
    return row

print(query_results_owner_pet_count)

(query_results_owner_pet_count | "Convert" >> beam.Map(convert_to_csv)
                               | "Print" >> beam.Map(print_and_return)
                               | "Write to File" >> beam.io.textio.WriteToText('./temp_files/query_results_pet_count')
)                         

query_results_owner_pet_count.pipeline.run().wait_until_finish()

# Run a Queries using the Beam SQLTransform

At this point, just query a single table

In [None]:
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform


with beam.Pipeline() as pipeline:
    pets = (
        pipeline
        | 'Read CSV File' >> beam.io.ReadFromText('./temp_files/pets-io.csv', skip_header_lines=1)
        | 'Parse CSV Rows' >> beam.Map(parse_pets_row).with_output_types(Pet)
    )

    fish = (
        pets
        | 'Filter Dogs with SQL' >> SqlTransform(
            """
            SELECT owner_id, pet_name
            FROM PCOLLECTION pets
            WHERE pet_type = 'Fish'
            """
        )
        | 'Print fish' >> beam.Map(print)
    )
    
    
    pet_counts = (
        pets
        | 'Count Pets by Owner' >> SqlTransform(
            """
            SELECT owner_id, count(id) as pet_count
            FROM PCOLLECTION pets
            GROUP BY owner_id
            """
        )
        | 'Print pet counts' >> beam.Map(print)
    )

## Join two tables

In [None]:
import apache_beam as beam
from apache_beam.transforms.sql import SqlTransform


def format_result(result):
    # return f"{result['first_name']}, {result['last_name']}, {result['pet_count']}"
    return f"{result.first_name}, {result.last_name}, {result.pet_count}"


with beam.Pipeline() as p:
    pets = (
        p
        | 'ReadPets' >> beam.io.ReadFromText('./temp_files/pets-io.csv', skip_header_lines=1)
        | 'ParsePets' >> beam.Map(parse_pets_row).with_output_types(Pet)
    )

    owners = (
        p
        | 'ReadOwners' >> beam.io.ReadFromText('./temp_files/owners-io.csv', skip_header_lines=1)
        | 'ParseOwners' >> beam.Map(parse_owners_row).with_output_types(Owner)
    )
    
       # Define the SQL query
    query = '''
    SELECT
      owners.first_name as first_name,
      owners.last_name as last_name,
      COUNT(pets.id) AS pet_count
    FROM
      owners
    JOIN
      pets
    ON
      owners.id = pets.owner_id
    GROUP BY
      owners.first_name,
      owners.last_name
    '''

    results = (
        {'owners': owners, 'pets': pets}
        | 'JoinAndCount' >> SqlTransform(query)
    )
    
    formatted_results = (
        results | 'format' >> beam.Map(format_result)
                | 'WriteOutput' >> beam.io.WriteToText('./temp_files/joined-output.txt')
    )