managed-connectivity/cloud-workflows/byo-connector/templates/byo-connector.yaml (211 lines of code) (raw):

main: params: [args] steps: - init: assign: - WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")} - NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")} - SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")} - NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])} - check_networking: switch: - condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""} raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one." - condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""} steps: - submit_extract_job_with_default_network_uri: assign: - NETWORK_TYPE: "networkUri" - NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"} - condition: ${NETWORK_URI != ""} steps: - submit_extract_job_with_network_uri: assign: - NETWORKING: ${NETWORK_URI} - NETWORK_TYPE: "networkUri" - condition: ${SUBNETWORK_URI != ""} steps: - submit_extract_job_with_subnetwork_uri: assign: - NETWORKING: ${SUBNETWORK_URI} - NETWORK_TYPE: "subnetworkUri" next: check_create_target_entry_group - check_create_target_entry_group: switch: - condition: ${args.CREATE_TARGET_ENTRY_GROUP == true} next: create_target_entry_group - condition: ${args.CREATE_TARGET_ENTRY_GROUP == false} next: generate_extract_job_link - create_target_entry_group: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: generate_extract_job_link - generate_extract_job_link: call: sys.log args: data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID} severity: "INFO" next: submit_pyspark_extract_job - submit_pyspark_extract_job: call: http.post args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" headers: Content-Type: "application/json" query: batchId: ${WORKFLOW_ID} body: pysparkBatch: mainPythonFileUri: file:///main.py args: - ${"--target_project_id=" + args.TARGET_PROJECT_ID} - ${"--target_location_id=" + args.CLOUD_REGION} - ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID} - ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID} - ${"--output_folder=" + WORKFLOW_ID} - ${args.ADDITIONAL_CONNECTOR_ARGS} runtimeConfig: containerImage: ${args.CUSTOM_CONTAINER_IMAGE} environmentConfig: executionConfig: serviceAccount: ${args.SERVICE_ACCOUNT} stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID} ${NETWORK_TYPE}: ${NETWORKING} networkTags: ${NETWORK_TAGS} result: RESPONSE_MESSAGE next: check_pyspark_extract_job - check_pyspark_extract_job: call: http.get args: url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: PYSPARK_EXTRACT_JOB_STATUS next: check_pyspark_extract_job_done - check_pyspark_extract_job_done: switch: - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"} next: generate_import_logs_link - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} - condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"} raise: ${PYSPARK_EXTRACT_JOB_STATUS} next: pyspark_extract_job_wait - pyspark_extract_job_wait: call: sys.sleep args: seconds: 30 next: check_pyspark_extract_job - generate_import_logs_link: call: sys.log args: data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"} severity: "INFO" next: submit_import_job - submit_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" body: type: IMPORT import_spec: source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"} entry_sync_mode: FULL aspect_sync_mode: INCREMENTAL log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")} scope: entry_groups: - ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID} entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES} aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES} result: IMPORT_JOB_RESPONSE next: get_job_start_time - get_job_start_time: assign: - importJobStartTime: ${sys.now()} next: import_job_startup_wait - import_job_startup_wait: call: sys.sleep args: seconds: 30 next: initial_get_import_job - initial_get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_status_available - check_import_job_status_available: switch: - condition: ${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds next: kill_import_job - condition: ${"status" in IMPORT_JOB_STATUS.body} next: check_import_job_done next: import_job_status_wait - import_job_status_wait: call: sys.sleep args: seconds: 30 next: check_import_job_status_available - check_import_job_done: switch: - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"} next: the_end - condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"} raise: ${IMPORT_JOB_STATUS} - condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"} raise: ${IMPORT_JOB_STATUS} - condition: ${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds next: kill_import_job next: import_job_wait - get_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: IMPORT_JOB_STATUS next: check_import_job_done - import_job_wait: call: sys.sleep args: seconds: 30 next: get_import_job - kill_import_job: call: http.post args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" next: get_killed_import_job - get_killed_import_job: call: http.get args: url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID} auth: type: OAuth2 scopes: "https://www.googleapis.com/auth/cloud-platform" result: KILLED_IMPORT_JOB_STATUS next: killed - killed: raise: ${KILLED_IMPORT_JOB_STATUS} - the_end: return: ${IMPORT_JOB_STATUS}