# Apache Beam Dataflow Runner

In [None]:
! pip install apache_beam apache-beam[gcp] --quiet

In [None]:
import IPython
from IPython.display import display

app = IPython.Application.instance()
app.kernel.do_shutdown(True)

# Set Required Variables

In [None]:
from datetime import datetime

project_id='dsl-dar'
dataset_id='beam_dataset'
table_id='pets_output_table2'
bucket_name='dataflow-temp-bucket-dar'
filename = 'gs://dataflow-temp-bucket-dar/input/pets.csv'
region = "us-central1"

# Create the unique job name by appending the timestamp
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
job_name = f"storage-to-bq-{timestamp}"

# Simple Pipeline: Read, Parse, Write to BQ

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import csv

def parse_pets_csv(line):
    fields = line.split(',')
    return {
        'id': int(fields[0]),
        'owner_id': int(fields[1]),
        'pet_name': fields[2],
        'pet_type': fields[3],
        'breed': fields[4],
        'weight': float(fields[5])
    }


pets_table_schema = {
    'fields': [
        {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'owner_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'pet_name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'pet_type', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'breed', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'weight', 'type': 'FLOAT', 'mode': 'NULLABLE'},
    ]
}

# Define the pipeline options
options = PipelineOptions(
    project=project_id,
    temp_location='gs://{0}/temp'.format(bucket_name)
)


# Create and run the pipeline
with beam.Pipeline(options=options) as p:
    
    pets = (
        p
        | 'Read Pets CSV' >> beam.io.ReadFromText(filename, skip_header_lines=1)
        | 'Parse Pets CSV' >> beam.Map(parse_pets_csv)
    )
    

    pets | 'Write Pets to BigQuery' >> WriteToBigQuery(
        f'{project_id}:{dataset_id}.{table_id}',
        schema=pets_table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )


print("Done")

# The Above Pipeline with Changes Required to Run in Dataflow

You are just passing the required options to the Pipeline to use the Dataflow Runner. 

In the Google Cloud Console, go to the Dataflow service to monitor your job. 

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import csv

def parse_pets_csv(line):
    fields = line.split(',')
    return {
        'id': int(fields[0]),
        'owner_id': int(fields[1]),
        'pet_name': fields[2],
        'pet_type': fields[3],
        'breed': fields[4],
        'weight': float(fields[5])
    }


pets_table_schema = {
    'fields': [
        {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'owner_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'pet_name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'pet_type', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'breed', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'weight', 'type': 'FLOAT', 'mode': 'NULLABLE'},
    ]
}

# Define the pipeline options
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = job_name
google_cloud_options.staging_location = f'gs://{bucket_name}/staging'
google_cloud_options.temp_location = f'gs://{bucket_name}/temp'
google_cloud_options.region = region  
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'

# Create and run the pipeline
with beam.Pipeline(options=options) as p:
    
    pets = (
        p
        | 'Read Pets CSV' >> beam.io.ReadFromText(filename, skip_header_lines=1)
        | 'Parse Pets CSV' >> beam.Map(parse_pets_csv)
    )
    
    pets | 'Write Pets to BigQuery' >> WriteToBigQuery(
        f'{project_id}:{dataset_id}.{table_id}',
        schema=pets_table_schema,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )


# Run the following to create a file main.py with the above Pipeline code

You might want to run the code as a Python program from a script. Below, the code above is written to a file: main.py

In [None]:
code = """
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
import argparse
from datetime import datetime

def parse_pets_csv(line):
    fields = line.split(',')
    return {
        'id': int(fields[0]),
        'owner_id': int(fields[1]),
        'pet_name': fields[2],
        'pet_type': fields[3],
        'breed': fields[4],
        'weight': float(fields[5])
    }

pets_table_schema = {
    'fields': [
        {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'owner_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},
        {'name': 'pet_name', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'pet_type', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'breed', 'type': 'STRING', 'mode': 'NULLABLE'},
        {'name': 'weight', 'type': 'FLOAT', 'mode': 'NULLABLE'},
    ]
}

def run(argv=None):
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '--project_id',
        required=True,
        help='Google Cloud project ID'
    )
    parser.add_argument(
        '--bucket_name',
        required=True,
        help='Google Cloud Storage bucket name'
    )
    parser.add_argument(
        '--filename',
        required=True,
        help='GCS path to the CSV file'
    )
    parser.add_argument(
        '--dataset_id',
        required=True,
        help='BigQuery dataset ID'
    )
    parser.add_argument(
        '--table_id',
        required=True,
        help='BigQuery table ID'
    )
    parser.add_argument(
        '--region',
        required=True,
        help='Google Cloud region'
    )

    args, beam_args = parser.parse_known_args(argv)

    # Get the current timestamp and format it
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    # Create the unique job name by appending the timestamp
    job_name = f"storage-to-bq-{timestamp}"

    # Define the pipeline options
    options = PipelineOptions(beam_args)
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = args.project_id
    google_cloud_options.job_name = job_name
    google_cloud_options.staging_location = f'gs://{args.bucket_name}/staging'
    google_cloud_options.temp_location = f'gs://{args.bucket_name}/temp'
    google_cloud_options.region = args.region
    options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'

    # Create and run the pipeline
    with beam.Pipeline(options=options) as p:
        pets = (
            p
            | 'Read Pets CSV' >> beam.io.ReadFromText(args.filename, skip_header_lines=1)
            | 'Parse Pets CSV' >> beam.Map(parse_pets_csv)
        )
        
        pets | 'Write Pets to BigQuery' >> WriteToBigQuery(
            f'{args.project_id}:{args.dataset_id}.{args.table_id}',
            schema=pets_table_schema,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
        )

if __name__ == '__main__':
    run()
"""

# Write the code to a file named main.py
with open('main.py', 'w') as f:
    f.write(code)


# Run main.py passing in the appropriate arguments

The example below will run main.py with the required arguments. The arguments would need to be changed for your project, bucket, BigQuery names, and region. 

In the Google Cloud Console, go to the Dataflow service to monitor your job. 

In [None]:
%%bash

python main.py \
    --project_id dsl-dar \
    --bucket_name dataflow-temp-bucket-dar \
    --filename gs://dataflow-temp-bucket-dar/input/pets.csv \
    --dataset_id beam_dataset \
    --table_id pets_output_table3 \
    --region us-central1