classify-split-extract-workflow/classify-extract.yaml (215 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # [START workflows_cloud_run_jobs_payload] main: params: [event] steps: - init: assign: - results : {} # result from each iteration keyed by table name- - exec_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} - event_bucket: ${event.bucket} - target_bucket: ${sys.get_env("CLASSIFY_INPUT_BUCKET")} - output_bucket: ${sys.get_env("CLASSIFY_OUTPUT_BUCKET")} - input_file: ${event.data.name} - job_name: ${sys.get_env("CLASSIFY_JOB_NAME")} - auto_extract_str: ${sys.get_env("AUTO_EXTRACT", "true")} - auto_extract: ${auto_extract_str == "true"} - data_synch_str: ${sys.get_env("DATA_SYNCH")} - data_synch: ${data_synch_str == "true"} - job_location: ${sys.get_env("REGION")} - database_root: ${"projects/" + project_id + "/databases/(default)/documents/classify/"} - file_name: ${text.substring(input_file, -14, len(input_file))} - splitter_output_dir: ${sys.get_env("SPLITTER_OUTPUT_DIR", "splitter_output")} - regex_pattern: ${"(.*/)" + splitter_output_dir + "(/|$)"} - is_splitter_output: ${text.match_regex(input_file, regex_pattern)} - log_event: call: sys.log args: data: ${"event_bucket=" + event_bucket + ", input_file=" + input_file + ", target_bucket=" + target_bucket + ", data_sync=" + data_synch + ", file_name=" + file_name + ",is_splitter_output=" + string(is_splitter_output)} - check_input_file: switch: - condition: ${event_bucket == target_bucket and (file_name == "START_PIPELINE" or (data_synch and not is_splitter_output))} next: create_callback - condition: true steps: - log_no_execution: call: sys.log args: text: Pipeline not triggered severity: INFO - return_result_no_run: assign: - results["no_run"]: ${"No pipeline execution because the following conditions is not met [(1) and ((2) or (3))]. (1) event_bucket==target_bucket is [" + string(event_bucket == target_bucket) + "], (2) file_name==START_PIPELINE is [" + string(file_name == "START_PIPELINE") + "], (3) data_synch = [" + string(data_synch) + "] and not is_splitter_output = [" + string(not is_splitter_output) + "] "} - complete_no_job_triggered: next: return_results - create_callback: call: events.create_callback_endpoint args: http_callback_method: "POST" result: callback_details - log_callback_details: call: sys.log args: text: ${callback_details} - run_classify_job: call: googleapis.run.v1.namespaces.jobs.run args: name: ${"namespaces/" + project_id + "/jobs/" + job_name} location: ${job_location} body: overrides: containerOverrides: env: - name: CLASSIFY_INPUT_BUCKET value: ${target_bucket} - name: INPUT_FILE value: ${input_file} - name: CLASSIFY_OUTPUT_BUCKET value: ${output_bucket} - name: CALL_BACK_URL value: ${callback_details.url} result: job_execution - print_callback_details: call: sys.log args: severity: "INFO" text: ${"Listening for callbacks on " + callback_details.url} - await_callback: call: events.await_callback args: callback: ${callback_details} timeout: 3600 result: callback_request - log_callback_received: call: sys.log args: severity: "INFO" text: ${"Received " + json.encode_to_string(callback_request.http_request)} - assign_extract: assign: - success: ${callback_request.http_request.body.success} - bucket: ${callback_request.http_request.body.bucket} - object: ${callback_request.http_request.body.object} - check_callback_result: switch: - condition: ${not success} raise: "Failed Classification Job Execution" - condition: ${auto_extract} next: get_classify_output - condition: true steps: - return_result_no_success: assign: - results["no_extraction"]: ${"No pipeline execution because auto_extract = [" + string(auto_extract) + "] "} - complete_no_success_triggered: next: return_results - get_classify_output: call: googleapis.storage.v1.objects.get args: bucket: ${bucket} object: ${object} alt: media result: groups - log_execution: call: sys.log args: data: ${groups} - process_classification_results: for: value: group in: ${groups} steps: - logTable: call: sys.log args: text: ${"Running query for object_table " + group.object_table_name + " and model " + group.model_name + " and out_table " + group.out_table_name} severity: INFO - extract_table_components: assign: - tableComponents: ${text.split(group.out_table_name, ".")} # Split the string using "." as the delimiter - bq_projectId: ${tableComponents[0]} - datasetId: ${tableComponents[1]} - tableId: ${tableComponents[2]} - check_table_exists: try: steps: - get_table_info: call: googleapis.bigquery.v2.tables.get args: projectId: ${bq_projectId} datasetId: ${datasetId} tableId: ${tableId} result: table_info - assign_query: switch: - condition: ${table_info != null} assign: - query_string: ${"INSERT INTO `" + group.out_table_name + "` SELECT * FROM ML.PROCESS_DOCUMENT(MODEL `" + group.model_name + "`, TABLE `" + group.object_table_name + "`); SELECT TO_JSON_STRING(t) AS json_row FROM `" + group.out_table_name + "` t;"} - condition: true steps: - return_result_empty_insert: assign: - results[group.out_table_name]: No query execution Unknown error - complete_no_job_insert: next: end_job except: as: e steps: - log_check_table_exists_error: call: sys.log args: text: ${"Received " + json.encode_to_string(e)} severity: INFO - create_table: switch: - condition: ${e.code == 404} assign: - query_string: ${"CREATE TABLE `" + group.out_table_name + "` AS SELECT * FROM ML.PROCESS_DOCUMENT(MODEL `" + group.model_name + "`, TABLE `" + group.object_table_name + "`); SELECT TO_JSON_STRING(t) AS json_row FROM `" + group.out_table_name + "` t;"} - condition: true steps: - return_result_empty: assign: - results[group.out_table_name]: No query execution - complete_no_job_create: next: end_job - log_query: call: sys.log args: text: ${"Running query " + query_string} severity: INFO - run_query: try: call: googleapis.bigquery.v2.jobs.query args: projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} body: useLegacySql: false useQueryCache: false timeoutMs: 30000 query: ${query_string} result: queryResult except: as: e steps: - log_error_on_create: call: sys.log args: severity: "ERROR" text: ${"Received error " + e.message} - raise_error: raise: ${"Failed Query Execution with message " + e.message} - log_create_query_result: call: sys.log args: text: ${"Result of query " + json.encode_to_string(queryResult)} severity: INFO - return_insert_result: assign: - results[group.out_table_name]: {} - results[group.out_table_name].data: ${queryResult.rows[0].f[0].v} - end_job: call: sys.log args: text: ${"Job completed for object_table " + group.object_table_name + " and model " + group.model_name + " and out_table " + group.out_table_name} severity: INFO - return_results: return: ${results} # [END workflows_cloud_run_jobs_payload]