courses/DSL/dataflow-examples/apache-beam-dataflow-runner.ipynb (418 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "6432b24f-1fef-46c3-ad5a-78120cbdbd16",
"metadata": {
"tags": []
},
"source": [
"# Apache Beam Dataflow Runner"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2ce0025a-52ff-4623-b34d-cb952409dd8a",
"metadata": {
"id": "pnRH5srPn8LLdiGS2xfTN9Lz",
"tags": []
},
"outputs": [],
"source": [
"! pip install apache_beam apache-beam[gcp] --quiet"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "db748828-a38a-47b5-8e35-7e2974854718",
"metadata": {
"id": "cRs_CxwZAhxC",
"tags": []
},
"outputs": [],
"source": [
"import IPython\n",
"from IPython.display import display\n",
"\n",
"app = IPython.Application.instance()\n",
"app.kernel.do_shutdown(True)"
]
},
{
"cell_type": "markdown",
"id": "6b7adae2-155f-4c67-b25c-6f9b865ac781",
"metadata": {
"id": "mJlIq_qYAzfF",
"tags": []
},
"source": [
"# Set Required Variables"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c3d53b0b-7158-44ea-944c-f4f1172b7f19",
"metadata": {
"id": "LG4z8a_zCWto",
"tags": []
},
"outputs": [],
"source": [
"from datetime import datetime\n",
"\n",
"project_id='dsl-dar'\n",
"dataset_id='beam_dataset'\n",
"table_id='pets_output_table2'\n",
"bucket_name='dataflow-temp-bucket-dar'\n",
"filename = 'gs://dataflow-temp-bucket-dar/input/pets.csv'\n",
"region = \"us-central1\"\n",
"\n",
"# Create the unique job name by appending the timestamp\n",
"timestamp = datetime.now().strftime(\"%Y%m%d%H%M%S\")\n",
"job_name = f\"storage-to-bq-{timestamp}\""
]
},
{
"cell_type": "markdown",
"id": "88825db3-cbaa-4d20-811a-ee8c8a584676",
"metadata": {},
"source": [
"# Simple Pipeline: Read, Parse, Write to BQ"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f642fc46-607a-43b4-80ef-58b9f0efcd0b",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions\n",
"from apache_beam.io.gcp.bigquery import WriteToBigQuery\n",
"import csv\n",
"\n",
"def parse_pets_csv(line):\n",
" fields = line.split(',')\n",
" return {\n",
" 'id': int(fields[0]),\n",
" 'owner_id': int(fields[1]),\n",
" 'pet_name': fields[2],\n",
" 'pet_type': fields[3],\n",
" 'breed': fields[4],\n",
" 'weight': float(fields[5])\n",
" }\n",
"\n",
"\n",
"pets_table_schema = {\n",
" 'fields': [\n",
" {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'owner_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'pet_name', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'pet_type', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'breed', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'weight', 'type': 'FLOAT', 'mode': 'NULLABLE'},\n",
" ]\n",
"}\n",
"\n",
"# Define the pipeline options\n",
"options = PipelineOptions(\n",
" project=project_id,\n",
" temp_location='gs://{0}/temp'.format(bucket_name)\n",
")\n",
"\n",
"\n",
"# Create and run the pipeline\n",
"with beam.Pipeline(options=options) as p:\n",
" \n",
" pets = (\n",
" p\n",
" | 'Read Pets CSV' >> beam.io.ReadFromText(filename, skip_header_lines=1)\n",
" | 'Parse Pets CSV' >> beam.Map(parse_pets_csv)\n",
" )\n",
" \n",
"\n",
" pets | 'Write Pets to BigQuery' >> WriteToBigQuery(\n",
" f'{project_id}:{dataset_id}.{table_id}',\n",
" schema=pets_table_schema,\n",
" write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,\n",
" create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n",
" )\n",
"\n",
"\n",
"print(\"Done\")"
]
},
{
"cell_type": "markdown",
"id": "b95d4817-ef10-47ad-b6b7-16b36271a16b",
"metadata": {},
"source": [
"# The Above Pipeline with Changes Required to Run in Dataflow\n",
"\n",
"You are just passing the required options to the Pipeline to use the Dataflow Runner. \n",
"\n",
"In the Google Cloud Console, go to the Dataflow service to monitor your job. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "63cf156d-357e-4645-bd08-370afc22cd00",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions\n",
"from apache_beam.io.gcp.bigquery import WriteToBigQuery\n",
"import csv\n",
"\n",
"def parse_pets_csv(line):\n",
" fields = line.split(',')\n",
" return {\n",
" 'id': int(fields[0]),\n",
" 'owner_id': int(fields[1]),\n",
" 'pet_name': fields[2],\n",
" 'pet_type': fields[3],\n",
" 'breed': fields[4],\n",
" 'weight': float(fields[5])\n",
" }\n",
"\n",
"\n",
"pets_table_schema = {\n",
" 'fields': [\n",
" {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'owner_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'pet_name', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'pet_type', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'breed', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'weight', 'type': 'FLOAT', 'mode': 'NULLABLE'},\n",
" ]\n",
"}\n",
"\n",
"# Define the pipeline options\n",
"options = PipelineOptions()\n",
"google_cloud_options = options.view_as(GoogleCloudOptions)\n",
"google_cloud_options.project = project_id\n",
"google_cloud_options.job_name = job_name\n",
"google_cloud_options.staging_location = f'gs://{bucket_name}/staging'\n",
"google_cloud_options.temp_location = f'gs://{bucket_name}/temp'\n",
"google_cloud_options.region = region \n",
"options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'\n",
"\n",
"# Create and run the pipeline\n",
"with beam.Pipeline(options=options) as p:\n",
" \n",
" pets = (\n",
" p\n",
" | 'Read Pets CSV' >> beam.io.ReadFromText(filename, skip_header_lines=1)\n",
" | 'Parse Pets CSV' >> beam.Map(parse_pets_csv)\n",
" )\n",
" \n",
" pets | 'Write Pets to BigQuery' >> WriteToBigQuery(\n",
" f'{project_id}:{dataset_id}.{table_id}',\n",
" schema=pets_table_schema,\n",
" write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,\n",
" create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n",
" )\n"
]
},
{
"cell_type": "markdown",
"id": "c477b0cb-dabd-4ffc-8575-d6c4de40342a",
"metadata": {},
"source": [
"# Run the following to create a file main.py with the above Pipeline code\n",
"\n",
"You might want to run the code as a Python program from a script. Below, the code above is written to a file: main.py"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e7ff30e8-f22a-45d7-a21a-e263d5265fc5",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"code = \"\"\"\n",
"import apache_beam as beam\n",
"from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions\n",
"from apache_beam.io.gcp.bigquery import WriteToBigQuery\n",
"import argparse\n",
"from datetime import datetime\n",
"\n",
"def parse_pets_csv(line):\n",
" fields = line.split(',')\n",
" return {\n",
" 'id': int(fields[0]),\n",
" 'owner_id': int(fields[1]),\n",
" 'pet_name': fields[2],\n",
" 'pet_type': fields[3],\n",
" 'breed': fields[4],\n",
" 'weight': float(fields[5])\n",
" }\n",
"\n",
"pets_table_schema = {\n",
" 'fields': [\n",
" {'name': 'id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'owner_id', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'pet_name', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'pet_type', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'breed', 'type': 'STRING', 'mode': 'NULLABLE'},\n",
" {'name': 'weight', 'type': 'FLOAT', 'mode': 'NULLABLE'},\n",
" ]\n",
"}\n",
"\n",
"def run(argv=None):\n",
" parser = argparse.ArgumentParser()\n",
"\n",
" parser.add_argument(\n",
" '--project_id',\n",
" required=True,\n",
" help='Google Cloud project ID'\n",
" )\n",
" parser.add_argument(\n",
" '--bucket_name',\n",
" required=True,\n",
" help='Google Cloud Storage bucket name'\n",
" )\n",
" parser.add_argument(\n",
" '--filename',\n",
" required=True,\n",
" help='GCS path to the CSV file'\n",
" )\n",
" parser.add_argument(\n",
" '--dataset_id',\n",
" required=True,\n",
" help='BigQuery dataset ID'\n",
" )\n",
" parser.add_argument(\n",
" '--table_id',\n",
" required=True,\n",
" help='BigQuery table ID'\n",
" )\n",
" parser.add_argument(\n",
" '--region',\n",
" required=True,\n",
" help='Google Cloud region'\n",
" )\n",
"\n",
" args, beam_args = parser.parse_known_args(argv)\n",
"\n",
" # Get the current timestamp and format it\n",
" timestamp = datetime.now().strftime(\"%Y%m%d%H%M%S\")\n",
" # Create the unique job name by appending the timestamp\n",
" job_name = f\"storage-to-bq-{timestamp}\"\n",
"\n",
" # Define the pipeline options\n",
" options = PipelineOptions(beam_args)\n",
" google_cloud_options = options.view_as(GoogleCloudOptions)\n",
" google_cloud_options.project = args.project_id\n",
" google_cloud_options.job_name = job_name\n",
" google_cloud_options.staging_location = f'gs://{args.bucket_name}/staging'\n",
" google_cloud_options.temp_location = f'gs://{args.bucket_name}/temp'\n",
" google_cloud_options.region = args.region\n",
" options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'\n",
"\n",
" # Create and run the pipeline\n",
" with beam.Pipeline(options=options) as p:\n",
" pets = (\n",
" p\n",
" | 'Read Pets CSV' >> beam.io.ReadFromText(args.filename, skip_header_lines=1)\n",
" | 'Parse Pets CSV' >> beam.Map(parse_pets_csv)\n",
" )\n",
" \n",
" pets | 'Write Pets to BigQuery' >> WriteToBigQuery(\n",
" f'{args.project_id}:{args.dataset_id}.{args.table_id}',\n",
" schema=pets_table_schema,\n",
" write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,\n",
" create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n",
" )\n",
"\n",
"if __name__ == '__main__':\n",
" run()\n",
"\"\"\"\n",
"\n",
"# Write the code to a file named main.py\n",
"with open('main.py', 'w') as f:\n",
" f.write(code)\n"
]
},
{
"cell_type": "markdown",
"id": "24e87d17-3ca2-45fb-84e8-9405f1f45ea0",
"metadata": {},
"source": [
"# Run main.py passing in the appropriate arguments\n",
"\n",
"The example below will run main.py with the required arguments. The arguments would need to be changed for your project, bucket, BigQuery names, and region. \n",
"\n",
"In the Google Cloud Console, go to the Dataflow service to monitor your job. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0fd707d0-50f7-41dc-8807-189f5936d543",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"%%bash\n",
"\n",
"python main.py \\\n",
" --project_id dsl-dar \\\n",
" --bucket_name dataflow-temp-bucket-dar \\\n",
" --filename gs://dataflow-temp-bucket-dar/input/pets.csv \\\n",
" --dataset_id beam_dataset \\\n",
" --table_id pets_output_table3 \\\n",
" --region us-central1"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "35fd9890-d800-4e8a-9229-a228fd23e214",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"kernel": "apache-beam-2.56.0",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Apache Beam 2.56.0 (Local)",
"language": "python",
"name": "apache-beam-2.56.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}