workflows-samples/export-to-bigquery/export-to-bigquery-delete-batch-jobs.yaml (113 lines of code) (raw):

main: params: [args] steps: - parse_args: assign: - project: ${default(map.get(args, "project"), sys.get_env("GOOGLE_CLOUD_PROJECT_ID"))} - location: ${default(map.get(args, "location"), sys.get_env("GOOGLE_CLOUD_LOCATION"))} - job_filter: ${default(map.get(args, "job_filter"), "(status.state:SUCCEEDED OR status.state:FAILED OR status.state:CANCELLED) AND create_time<=\"2023-05-01T00:00:00Z\"")} - page_size: ${default(map.get(args, "page_size"), 100)} # default page size to 100 - dataset_id: ${default(map.get(args, "dataset_id"), "default_dataset_id")} # default dataset_id as default_dataset_id - table_id: ${default(map.get(args, "table_id"), "default_table_id")} # default table id as default_table_id - init: assign: - next_page_token: "" - number_of_jobs: 0 - try_create_bq_dataset: try: call: googleapis.bigquery.v2.datasets.insert args: projectId: ${project} body: datasetReference: datasetId: ${dataset_id} projectId: ${project} except: as: e steps: - insert_dataset_errors: switch: - condition: ${e.code == 409} # 409 error code indicate dataset already exists next: try_create_bq_table - insert_dataset_unhandled_exception: raise: ${e} - try_create_bq_table: try: call: googleapis.bigquery.v2.tables.insert args: projectId: ${project} datasetId: ${dataset_id} body: tableReference: projectId: ${project} datasetId: ${dataset_id} tableId: ${table_id} schema: fields: - name: name type: STRING - name: uid type: STRING - name: job type: STRING except: as: e steps: - insert_table_errors: switch: - condition: ${e.code == 409} # 409 error code indicate table already exists next: list_batch_jobs - insert_table_unhandled_exception: raise: ${e} - list_batch_jobs: call: googleapis.batch.v1.projects.locations.jobs.list args: parent: ${"projects/" + project + "/locations/" + location} filter: ${job_filter} pageSize: ${page_size} pageToken: ${next_page_token} result: response - update_token: assign: - next_page_token: ${default(map.get(response, "nextPageToken"), "")} - export_then_delete_jobs: for: value: j in: ${default(map.get(response, "jobs"), [])} steps: - log_export_step: call: sys.log args: text: ${"Exporting " + j.name + " into the big query table " + table_id} severity: NOTICE - insert_job: call: googleapis.bigquery.v2.tabledata.insertAll args: datasetId: ${dataset_id} projectId: ${project} tableId: ${table_id} body: rows: - json: "name": ${j.name} "uid": ${j.uid} "job": ${json.encode_to_string(j)} - log_delete_step: call: sys.log args: text: ${"Deleting Batch job " + j.name} severity: NOTICE - delete_job: call: googleapis.batch.v1.projects.locations.jobs.delete args: name: ${j.name} - increment_count_of_jobs: assign: - number_of_jobs: ${number_of_jobs + 1} - ifNext: switch: - condition: ${next_page_token != ""} next: list_batch_jobs next: done - done: return: ${number_of_jobs + " Batch jobs were exported to Big Query table " + table_id + " and deleted"}