managed-connectivity/cloud-workflows/byo-connector/templates/byo-connector.yaml (211 lines of code) (raw):
main:
params: [args]
steps:
- init:
assign:
- WORKFLOW_ID: ${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- NETWORK_URI: ${default(map.get(args, "NETWORK_URI"), "")}
- SUBNETWORK_URI: ${default(map.get(args, "SUBNETWORK_URI"), "")}
- NETWORK_TAGS: ${default(map.get(args, "NETWORK_TAGS"), [])}
- check_networking:
switch:
- condition: ${NETWORK_URI != "" and SUBNETWORK_URI != ""}
raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
- condition: ${NETWORK_URI == "" and SUBNETWORK_URI == ""}
steps:
- submit_extract_job_with_default_network_uri:
assign:
- NETWORK_TYPE: "networkUri"
- NETWORKING: ${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
- condition: ${NETWORK_URI != ""}
steps:
- submit_extract_job_with_network_uri:
assign:
- NETWORKING: ${NETWORK_URI}
- NETWORK_TYPE: "networkUri"
- condition: ${SUBNETWORK_URI != ""}
steps:
- submit_extract_job_with_subnetwork_uri:
assign:
- NETWORKING: ${SUBNETWORK_URI}
- NETWORK_TYPE: "subnetworkUri"
next: check_create_target_entry_group
- check_create_target_entry_group:
switch:
- condition: ${args.CREATE_TARGET_ENTRY_GROUP == true}
next: create_target_entry_group
- condition: ${args.CREATE_TARGET_ENTRY_GROUP == false}
next: generate_extract_job_link
- create_target_entry_group:
call: http.post
args:
url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
next: generate_extract_job_link
- generate_extract_job_link:
call: sys.log
args:
data: ${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
severity: "INFO"
next: submit_pyspark_extract_job
- submit_pyspark_extract_job:
call: http.post
args:
url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
headers:
Content-Type: "application/json"
query:
batchId: ${WORKFLOW_ID}
body:
pysparkBatch:
mainPythonFileUri: file:///main.py
args:
- ${"--target_project_id=" + args.TARGET_PROJECT_ID}
- ${"--target_location_id=" + args.CLOUD_REGION}
- ${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
- ${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
- ${"--output_folder=" + WORKFLOW_ID}
- ${args.ADDITIONAL_CONNECTOR_ARGS}
runtimeConfig:
containerImage: ${args.CUSTOM_CONTAINER_IMAGE}
environmentConfig:
executionConfig:
serviceAccount: ${args.SERVICE_ACCOUNT}
stagingBucket: ${args.CLOUD_STORAGE_BUCKET_ID}
${NETWORK_TYPE}: ${NETWORKING}
networkTags: ${NETWORK_TAGS}
result: RESPONSE_MESSAGE
next: check_pyspark_extract_job
- check_pyspark_extract_job:
call: http.get
args:
url: ${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
result: PYSPARK_EXTRACT_JOB_STATUS
next: check_pyspark_extract_job_done
- check_pyspark_extract_job_done:
switch:
- condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
next: generate_import_logs_link
- condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
raise: ${PYSPARK_EXTRACT_JOB_STATUS}
- condition: ${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
raise: ${PYSPARK_EXTRACT_JOB_STATUS}
next: pyspark_extract_job_wait
- pyspark_extract_job_wait:
call: sys.sleep
args:
seconds: 30
next: check_pyspark_extract_job
- generate_import_logs_link:
call: sys.log
args:
data: ${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
severity: "INFO"
next: submit_import_job
- submit_import_job:
call: http.post
args:
url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
body:
type: IMPORT
import_spec:
source_storage_uri: ${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
entry_sync_mode: FULL
aspect_sync_mode: INCREMENTAL
log_level: ${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
scope:
entry_groups:
- ${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
entry_types: ${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
aspect_types: ${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
result: IMPORT_JOB_RESPONSE
next: get_job_start_time
- get_job_start_time:
assign:
- importJobStartTime: ${sys.now()}
next: import_job_startup_wait
- import_job_startup_wait:
call: sys.sleep
args:
seconds: 30
next: initial_get_import_job
- initial_get_import_job:
call: http.get
args:
url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
result: IMPORT_JOB_STATUS
next: check_import_job_status_available
- check_import_job_status_available:
switch:
- condition: ${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds
next: kill_import_job
- condition: ${"status" in IMPORT_JOB_STATUS.body}
next: check_import_job_done
next: import_job_status_wait
- import_job_status_wait:
call: sys.sleep
args:
seconds: 30
next: check_import_job_status_available
- check_import_job_done:
switch:
- condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
next: the_end
- condition: ${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
raise: ${IMPORT_JOB_STATUS}
- condition: ${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
raise: ${IMPORT_JOB_STATUS}
- condition: ${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
raise: ${IMPORT_JOB_STATUS}
- condition: ${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds
next: kill_import_job
next: import_job_wait
- get_import_job:
call: http.get
args:
url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
result: IMPORT_JOB_STATUS
next: check_import_job_done
- import_job_wait:
call: sys.sleep
args:
seconds: 30
next: get_import_job
- kill_import_job:
call: http.post
args:
url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
next: get_killed_import_job
- get_killed_import_job:
call: http.get
args:
url: ${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
result: KILLED_IMPORT_JOB_STATUS
next: killed
- killed:
raise: ${KILLED_IMPORT_JOB_STATUS}
- the_end:
return: ${IMPORT_JOB_STATUS}