classify-split-extract-workflow/classify-extract.yaml (215 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# [START workflows_cloud_run_jobs_payload]
main:
params: [event]
steps:
- init:
assign:
- results : {} # result from each iteration keyed by table name-
- exec_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- event_bucket: ${event.bucket}
- target_bucket: ${sys.get_env("CLASSIFY_INPUT_BUCKET")}
- output_bucket: ${sys.get_env("CLASSIFY_OUTPUT_BUCKET")}
- input_file: ${event.data.name}
- job_name: ${sys.get_env("CLASSIFY_JOB_NAME")}
- auto_extract_str: ${sys.get_env("AUTO_EXTRACT", "true")}
- auto_extract: ${auto_extract_str == "true"}
- data_synch_str: ${sys.get_env("DATA_SYNCH")}
- data_synch: ${data_synch_str == "true"}
- job_location: ${sys.get_env("REGION")}
- database_root: ${"projects/" + project_id + "/databases/(default)/documents/classify/"}
- file_name: ${text.substring(input_file, -14, len(input_file))}
- splitter_output_dir: ${sys.get_env("SPLITTER_OUTPUT_DIR", "splitter_output")}
- regex_pattern: ${"(.*/)" + splitter_output_dir + "(/|$)"}
- is_splitter_output: ${text.match_regex(input_file, regex_pattern)}
- log_event:
call: sys.log
args:
data: ${"event_bucket=" + event_bucket + ", input_file=" + input_file + ", target_bucket=" + target_bucket + ", data_sync=" + data_synch + ", file_name=" + file_name + ",is_splitter_output=" + string(is_splitter_output)}
- check_input_file:
switch:
- condition: ${event_bucket == target_bucket and (file_name == "START_PIPELINE" or (data_synch and not is_splitter_output))}
next: create_callback
- condition: true
steps:
- log_no_execution:
call: sys.log
args:
text: Pipeline not triggered
severity: INFO
- return_result_no_run:
assign:
- results["no_run"]: ${"No pipeline execution because the following conditions is not met [(1) and ((2) or (3))]. (1) event_bucket==target_bucket is [" + string(event_bucket == target_bucket) + "], (2) file_name==START_PIPELINE is [" + string(file_name == "START_PIPELINE") + "], (3) data_synch = [" + string(data_synch) + "] and not is_splitter_output = [" + string(not is_splitter_output) + "] "}
- complete_no_job_triggered:
next: return_results
- create_callback:
call: events.create_callback_endpoint
args:
http_callback_method: "POST"
result: callback_details
- log_callback_details:
call: sys.log
args:
text: ${callback_details}
- run_classify_job:
call: googleapis.run.v1.namespaces.jobs.run
args:
name: ${"namespaces/" + project_id + "/jobs/" + job_name}
location: ${job_location}
body:
overrides:
containerOverrides:
env:
- name: CLASSIFY_INPUT_BUCKET
value: ${target_bucket}
- name: INPUT_FILE
value: ${input_file}
- name: CLASSIFY_OUTPUT_BUCKET
value: ${output_bucket}
- name: CALL_BACK_URL
value: ${callback_details.url}
result: job_execution
- print_callback_details:
call: sys.log
args:
severity: "INFO"
text: ${"Listening for callbacks on " + callback_details.url}
- await_callback:
call: events.await_callback
args:
callback: ${callback_details}
timeout: 3600
result: callback_request
- log_callback_received:
call: sys.log
args:
severity: "INFO"
text: ${"Received " + json.encode_to_string(callback_request.http_request)}
- assign_extract:
assign:
- success: ${callback_request.http_request.body.success}
- bucket: ${callback_request.http_request.body.bucket}
- object: ${callback_request.http_request.body.object}
- check_callback_result:
switch:
- condition: ${not success}
raise: "Failed Classification Job Execution"
- condition: ${auto_extract}
next: get_classify_output
- condition: true
steps:
- return_result_no_success:
assign:
- results["no_extraction"]: ${"No pipeline execution because auto_extract = [" + string(auto_extract) + "] "}
- complete_no_success_triggered:
next: return_results
- get_classify_output:
call: googleapis.storage.v1.objects.get
args:
bucket: ${bucket}
object: ${object}
alt: media
result: groups
- log_execution:
call: sys.log
args:
data: ${groups}
- process_classification_results:
for:
value: group
in: ${groups}
steps:
- logTable:
call: sys.log
args:
text: ${"Running query for object_table " + group.object_table_name + " and model " + group.model_name + " and out_table " + group.out_table_name}
severity: INFO
- extract_table_components:
assign:
- tableComponents: ${text.split(group.out_table_name, ".")} # Split the string using "." as the delimiter
- bq_projectId: ${tableComponents[0]}
- datasetId: ${tableComponents[1]}
- tableId: ${tableComponents[2]}
- check_table_exists:
try:
steps:
- get_table_info:
call: googleapis.bigquery.v2.tables.get
args:
projectId: ${bq_projectId}
datasetId: ${datasetId}
tableId: ${tableId}
result: table_info
- assign_query:
switch:
- condition: ${table_info != null}
assign:
- query_string: ${"INSERT INTO `" + group.out_table_name + "` SELECT * FROM ML.PROCESS_DOCUMENT(MODEL `" + group.model_name + "`, TABLE `" + group.object_table_name + "`); SELECT TO_JSON_STRING(t) AS json_row FROM `" + group.out_table_name + "` t;"}
- condition: true
steps:
- return_result_empty_insert:
assign:
- results[group.out_table_name]: No query execution Unknown error
- complete_no_job_insert:
next: end_job
except:
as: e
steps:
- log_check_table_exists_error:
call: sys.log
args:
text: ${"Received " + json.encode_to_string(e)}
severity: INFO
- create_table:
switch:
- condition: ${e.code == 404}
assign:
- query_string: ${"CREATE TABLE `" + group.out_table_name + "` AS SELECT * FROM ML.PROCESS_DOCUMENT(MODEL `" + group.model_name + "`, TABLE `" + group.object_table_name + "`); SELECT TO_JSON_STRING(t) AS json_row FROM `" + group.out_table_name + "` t;"}
- condition: true
steps:
- return_result_empty:
assign:
- results[group.out_table_name]: No query execution
- complete_no_job_create:
next: end_job
- log_query:
call: sys.log
args:
text: ${"Running query " + query_string}
severity: INFO
- run_query:
try:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
body:
useLegacySql: false
useQueryCache: false
timeoutMs: 30000
query: ${query_string}
result: queryResult
except:
as: e
steps:
- log_error_on_create:
call: sys.log
args:
severity: "ERROR"
text: ${"Received error " + e.message}
- raise_error:
raise: ${"Failed Query Execution with message " + e.message}
- log_create_query_result:
call: sys.log
args:
text: ${"Result of query " + json.encode_to_string(queryResult)}
severity: INFO
- return_insert_result:
assign:
- results[group.out_table_name]: {}
- results[group.out_table_name].data: ${queryResult.rows[0].f[0].v}
- end_job:
call: sys.log
args:
text: ${"Job completed for object_table " + group.object_table_name + " and model " + group.model_name + " and out_table " + group.out_table_name}
severity: INFO
- return_results:
return: ${results}
# [END workflows_cloud_run_jobs_payload]