cloud-composer/dags/sample-dataflow-start-streaming-job.py (85 lines of code) (raw):
####################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
####################################################################################
# Author: Adam Paternostro
# Summary: Starts the Dataflow Pipeline that reads the public Pub/Sub NYC Taxi streaming data
####
# NOTE: The Dataflow job will be STOPPED by the Airflow DAG (sample-dataflow-stop-streaming-job) after 4 hours of streaming.
# This will keep you from running up a large GCP bill.
####
# [START dag]
from google.cloud import storage
from datetime import datetime, timedelta
import requests
import sys
import os
import logging
import json
import airflow
from airflow.operators import bash_operator
from airflow.utils import trigger_rule
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': None,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5),
'dagrun_timeout' : timedelta(minutes=60),
}
project_id = os.environ['ENV_PROJECT_ID']
region = os.environ['ENV_DATAFLOW_REGION']
raw_bucket_name = os.environ['ENV_RAW_BUCKET']
taxi_dataset_id = os.environ['ENV_TAXI_DATASET_ID']
dataproc_bucket = os.environ['ENV_DATAPROC_BUCKET']
dataflow_subnet = os.environ['ENV_DATAFLOW_SUBNET']
serviceAccount = os.environ['ENV_DATAFLOW_SERVICE_ACCOUNT']
output_table = project_id + ":" + taxi_dataset_id + ".taxi_trips_streaming"
dataflow_py_file = "gs://" + raw_bucket_name + "/dataflow/streaming-taxi-data.py"
tempLocation = "gs://" + dataproc_bucket + "/dataflow-temp/"
print("output_table: " + output_table)
print("dataflow_py_file: " + dataflow_py_file)
print("tempLocation: " + tempLocation)
print("serviceAccount: " + serviceAccount)
print("dataflow_subnet: " + dataflow_subnet)
def write_dataflow_job_id(dataflow_job_id):
run_datetime = datetime.now()
print("dataflow_job_id: ", dataflow_job_id)
data = {
"run_datetime" : run_datetime.strftime("%m/%d/%Y %H:%M:%S"),
"dataflow_job_id" : dataflow_job_id
}
with open('/home/airflow/gcs/data/write_dataflow_job_id.json', 'w') as f:
json.dump(data, f)
with airflow.DAG('sample-dataflow-start-streaming-job',
default_args=default_args,
start_date=datetime(2021, 1, 1),
# Add the Composer "Data" directory which will hold the SQL scripts for deployment
template_searchpath=['/home/airflow/gcs/data'],
# Not scheduled, trigger only
schedule_interval=None) as dag:
# Start Dataflow Python Streaming job
# DEPRECATED: https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/dataflow.html
# NEW: https://airflow.apache.org/docs/apache-airflow-providers-apache-beam/stable/_api/airflow/providers/apache/beam/operators/beam/index.html
# NEW EXAMPLE: https://github.com/apache/airflow/blob/main/airflow/providers/apache/beam/example_dags/example_beam.py
# airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator
# (*, py_file, runner='DirectRunner', default_pipeline_options=None,
# pipeline_options=None, py_interpreter='python3', py_options=None,
# py_requirements=None, py_system_site_packages=False,
# gcp_conn_id='google_cloud_default', delegate_to=None,
# dataflow_config=None, **kwargs)[source]
start_dataflow = BeamRunPythonPipelineOperator(
task_id="start_dataflow",
py_file=dataflow_py_file,
runner="DataflowRunner",
default_pipeline_options= {
'tempLocation' : tempLocation,
'output' : output_table,
'streaming' : True,
'serviceAccount' : serviceAccount,
"network" : "vpc-main",
'subnetwork' : dataflow_subnet,
'no_use_public_ips' : True,
'maxNumWorkers' : 5
},
py_interpreter='python3',
py_requirements=['google-cloud-pubsub==2.19.4','google-cloud-bigquery-storage==2.24.0','apache-beam[gcp]==2.53.0'],
py_system_site_packages=False,
dataflow_config=DataflowConfiguration(
job_name="taxi-dataflow-streaming-bigquery",
project_id = project_id,
location = region,
drain_pipeline = True,
wait_until_finished = False
),
)
# Show starting a data tranfer via the REST API
write_dataflow_job_id = PythonOperator(
task_id='write_dataflow_job_id',
python_callable= write_dataflow_job_id,
op_kwargs = { "dataflow_job_id" : "{{task_instance.xcom_pull('start_dataflow')['dataflow_job_id']}}" },
execution_timeout=timedelta(minutes=1),
dag=dag,
)
# DAG Graph
start_dataflow >> write_dataflow_job_id
# [END dag]