workflows-samples/export-to-bigquery/export-to-bigquery-delete-batch-jobs.yaml (113 lines of code) (raw):
main:
params: [args]
steps:
- parse_args:
assign:
- project: ${default(map.get(args, "project"), sys.get_env("GOOGLE_CLOUD_PROJECT_ID"))}
- location: ${default(map.get(args, "location"), sys.get_env("GOOGLE_CLOUD_LOCATION"))}
- job_filter: ${default(map.get(args, "job_filter"), "(status.state:SUCCEEDED OR status.state:FAILED OR status.state:CANCELLED) AND create_time<=\"2023-05-01T00:00:00Z\"")}
- page_size: ${default(map.get(args, "page_size"), 100)} # default page size to 100
- dataset_id: ${default(map.get(args, "dataset_id"), "default_dataset_id")} # default dataset_id as default_dataset_id
- table_id: ${default(map.get(args, "table_id"), "default_table_id")} # default table id as default_table_id
- init:
assign:
- next_page_token: ""
- number_of_jobs: 0
- try_create_bq_dataset:
try:
call: googleapis.bigquery.v2.datasets.insert
args:
projectId: ${project}
body:
datasetReference:
datasetId: ${dataset_id}
projectId: ${project}
except:
as: e
steps:
- insert_dataset_errors:
switch:
- condition: ${e.code == 409} # 409 error code indicate dataset already exists
next: try_create_bq_table
- insert_dataset_unhandled_exception:
raise: ${e}
- try_create_bq_table:
try:
call: googleapis.bigquery.v2.tables.insert
args:
projectId: ${project}
datasetId: ${dataset_id}
body:
tableReference:
projectId: ${project}
datasetId: ${dataset_id}
tableId: ${table_id}
schema:
fields:
- name: name
type: STRING
- name: uid
type: STRING
- name: job
type: STRING
except:
as: e
steps:
- insert_table_errors:
switch:
- condition: ${e.code == 409} # 409 error code indicate table already exists
next: list_batch_jobs
- insert_table_unhandled_exception:
raise: ${e}
- list_batch_jobs:
call: googleapis.batch.v1.projects.locations.jobs.list
args:
parent: ${"projects/" + project + "/locations/" + location}
filter: ${job_filter}
pageSize: ${page_size}
pageToken: ${next_page_token}
result: response
- update_token:
assign:
- next_page_token: ${default(map.get(response, "nextPageToken"), "")}
- export_then_delete_jobs:
for:
value: j
in: ${default(map.get(response, "jobs"), [])}
steps:
- log_export_step:
call: sys.log
args:
text: ${"Exporting " + j.name + " into the big query table " + table_id}
severity: NOTICE
- insert_job:
call: googleapis.bigquery.v2.tabledata.insertAll
args:
datasetId: ${dataset_id}
projectId: ${project}
tableId: ${table_id}
body:
rows:
- json:
"name": ${j.name}
"uid": ${j.uid}
"job": ${json.encode_to_string(j)}
- log_delete_step:
call: sys.log
args:
text: ${"Deleting Batch job " + j.name}
severity: NOTICE
- delete_job:
call: googleapis.batch.v1.projects.locations.jobs.delete
args:
name: ${j.name}
- increment_count_of_jobs:
assign:
- number_of_jobs: ${number_of_jobs + 1}
- ifNext:
switch:
- condition: ${next_page_token != ""}
next: list_batch_jobs
next: done
- done:
return: ${number_of_jobs + " Batch jobs were exported to Big Query table " + table_id + " and deleted"}