def stop_dataflow_job()

in cloud-composer/dags/sample-dataflow-stop-streaming-job.py [0:0]


def stop_dataflow_job():
    print("stop_dataflow_job")
    filePath = "/home/airflow/gcs/data/write_dataflow_job_id.json"
    stopJob = False
    if os.path.exists(filePath):
        with open(filePath) as f:
            data = json.load(f)
        
        print("run_datetime: ", data['run_datetime'])
        print("dataflow_job_id: ", data['dataflow_job_id'])
        run_datetime = datetime.strptime(data['run_datetime'], "%m/%d/%Y %H:%M:%S")
        dataflow_job_id = data['dataflow_job_id']
        difference = run_datetime - datetime.now()
        print("difference.total_seconds(): ", abs(difference.total_seconds()))
        # Test for 4 hours
        # if difference.total_seconds() > (4 * 60 * 60):
        if abs(difference.total_seconds()) > (4 * 60 * 60):
            print("Stopping job > 4 hours")
            stopJob = True
    else:
        print("Json files does not exist (no Dataflow job deployed)")

    if stopJob:
        # Get auth (default service account running composer worker node)
        creds, project = google.auth.default()
        auth_req = google.auth.transport.requests.Request() # required to acess access token
        creds.refresh(auth_req)
        access_token=creds.token
        auth_header = {
            'Authorization' : "Bearer " + access_token,
            'Accept' : 'application/json',
            'Content-Type' : 'application/json' }
        # print("auth_header: ", auth_header)

        # call rest api with bearer token
        # PUT https://dataflow.googleapis.com/v1b3/projects/{projectId}/locations/{location}/jobs/{jobId}
        #     https://dataflow.googleapis.com/v1b3/projects/data-analytics-demo-ja3y7o1hnz/locations/REPLACE-REGION/jobs/2022-05-17_09_05_35-7404530975856425200
        uri="https://dataflow.googleapis.com/v1b3/projects/" + project_id + "/locations/" + region + "/jobs/" + dataflow_job_id
        print("uri: ", uri)

        request_body = { "requestedState" : "JOB_STATE_CANCELLED" }

        """
        # https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.jobs/update
        curl --request PUT \
            'https://dataflow.googleapis.com/v1b3/projects/data-analytics-demo-ja3y7o1hnz/locations/REPLACE-REGION/jobs/2022-05-17_09_05_35-7404530975856425200' \
            --header 'Authorization: Bearer [YOUR_ACCESS_TOKEN]' \
            --header 'Accept: application/json' \
            --header 'Content-Type: application/json' \
            --data '{"requestedState":"JOB_STATE_CANCELLED"}' \
            --compressed
        """

        try:
            response = requests.put(uri, headers=auth_header, data=json.dumps(request_body))
            response.raise_for_status()
            print("Cancelled Dataflow Job")
            os.remove(filePath)
        except requests.exceptions.RequestException as err:
            print("Error: ", err)
            raise err