cloud-composer/dags/sample-dataflow-stop-streaming-job.py (75 lines of code) (raw):

#################################################################################### # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #################################################################################### # Author: Adam Paternostro # Summary: Stops the Dataflow job started by the sample-dataflow-streaming-bigquery DAG # [START dag] from datetime import datetime, timedelta from airflow.operators import bash_operator from airflow.utils import trigger_rule from airflow.operators.python_operator import PythonOperator import requests import sys import os import logging import json import airflow import google.auth import google.auth.transport.requests default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email': None, 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, 'retry_delay': timedelta(minutes=5), 'dagrun_timeout' : timedelta(minutes=60), } project_id = os.environ['ENV_PROJECT_ID'] region = os.environ['ENV_DATAFLOW_REGION'] # Opens the json written out when the job was started # Checks for 4 hours # Stops the job and deletes the json file # NOTE: This assumes only 1 job has been started and is not designed for many jobs def stop_dataflow_job(): print("stop_dataflow_job") filePath = "/home/airflow/gcs/data/write_dataflow_job_id.json" stopJob = False if os.path.exists(filePath): with open(filePath) as f: data = json.load(f) print("run_datetime: ", data['run_datetime']) print("dataflow_job_id: ", data['dataflow_job_id']) run_datetime = datetime.strptime(data['run_datetime'], "%m/%d/%Y %H:%M:%S") dataflow_job_id = data['dataflow_job_id'] difference = run_datetime - datetime.now() print("difference.total_seconds(): ", abs(difference.total_seconds())) # Test for 4 hours # if difference.total_seconds() > (4 * 60 * 60): if abs(difference.total_seconds()) > (4 * 60 * 60): print("Stopping job > 4 hours") stopJob = True else: print("Json files does not exist (no Dataflow job deployed)") if stopJob: # Get auth (default service account running composer worker node) creds, project = google.auth.default() auth_req = google.auth.transport.requests.Request() # required to acess access token creds.refresh(auth_req) access_token=creds.token auth_header = { 'Authorization' : "Bearer " + access_token, 'Accept' : 'application/json', 'Content-Type' : 'application/json' } # print("auth_header: ", auth_header) # call rest api with bearer token # PUT https://dataflow.googleapis.com/v1b3/projects/{projectId}/locations/{location}/jobs/{jobId} # https://dataflow.googleapis.com/v1b3/projects/data-analytics-demo-ja3y7o1hnz/locations/REPLACE-REGION/jobs/2022-05-17_09_05_35-7404530975856425200 uri="https://dataflow.googleapis.com/v1b3/projects/" + project_id + "/locations/" + region + "/jobs/" + dataflow_job_id print("uri: ", uri) request_body = { "requestedState" : "JOB_STATE_CANCELLED" } """ # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/update curl --request PUT \ 'https://dataflow.googleapis.com/v1b3/projects/data-analytics-demo-ja3y7o1hnz/locations/REPLACE-REGION/jobs/2022-05-17_09_05_35-7404530975856425200' \ --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '{"requestedState":"JOB_STATE_CANCELLED"}' \ --compressed """ try: response = requests.put(uri, headers=auth_header, data=json.dumps(request_body)) response.raise_for_status() print("Cancelled Dataflow Job") os.remove(filePath) except requests.exceptions.RequestException as err: print("Error: ", err) raise err with airflow.DAG('sample-dataflow-stop-streaming-job', default_args=default_args, start_date=datetime(2022, 1, 1), catchup=False, # Add the Composer "Data" directory which will hold the SQL scripts for deployment template_searchpath=['/home/airflow/gcs/data'], # Run every 15 minutes schedule_interval=timedelta(minutes=15)) as dag: # Show starting a data tranfer via the REST API stop_dataflow_job = PythonOperator( task_id='stop_dataflow_job', python_callable= stop_dataflow_job, execution_timeout=timedelta(minutes=1), dag=dag, ) # DAG Graph stop_dataflow_job # [END dag]