managed-connectivity/cloud-workflows/byo-connector/terraform/byo-connector.tfvars (217 lines of code) (raw):
project_id = "PROJECT_ID"
region = "LOCATION_ID"
service_account = "SERVICE_ACCOUNT_ID"
cron_schedule = "CRON_SCHEDULE_EXPRESSION"
workflow_args = {"TARGET_PROJECT_ID": "PROJECT_ID", "CLOUD_REGION": "LOCATION_ID", "TARGET_ENTRY_GROUP_ID": "ENTRY_GROUP_ID", "CREATE_TARGET_ENTRY_GROUP": CREATE_ENTRY_GROUP_BOOLEAN, "CLOUD_STORAGE_BUCKET_ID": "BUCKET_ID", "SERVICE_ACCOUNT": "SERVICE_ACCOUNT_ID", "ADDITIONAL_CONNECTOR_ARGS": [ADDITIONAL_CONNECTOR_ARGUMENTS], "CUSTOM_CONTAINER_IMAGE": "CONTAINER_IMAGE", "IMPORT_JOB_SCOPE_ENTRY_TYPES": [ENTRY_TYPES], "IMPORT_JOB_SCOPE_ASPECT_TYPES": [ASPECT_TYPES], "IMPORT_JOB_LOG_LEVEL": "INFO", "NETWORK_TAGS": [], "NETWORK_URI": "", "SUBNETWORK_URI": ""}
workflow_source = <<EOF
main:
params: [args]
steps:
- init:
assign:
- WORKFLOW_ID: $${"metadataworkflow-" + sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- NETWORK_URI: $${default(map.get(args, "NETWORK_URI"), "")}
- SUBNETWORK_URI: $${default(map.get(args, "SUBNETWORK_URI"), "")}
- NETWORK_TAGS: $${default(map.get(args, "NETWORK_TAGS"), [])}
- check_networking:
switch:
- condition: $${NETWORK_URI != "" and SUBNETWORK_URI != ""}
raise: "Error: cannot set both network_uri and subnetwork_uri. Please select one."
- condition: $${NETWORK_URI != ""}
steps:
- submit_extract_job_with_network_uri:
assign:
- NETWORKING: $${NETWORK_URI}
- NETWORK_TYPE: "networkUri"
- condition: $${SUBNETWORK_URI != ""}
steps:
- submit_extract_job_with_subnetwork_uri:
assign:
- NETWORKING: $${SUBNETWORK_URI}
- NETWORK_TYPE: "subnetworkUri"
next: set_default_networking
- set_default_networking:
assign:
- NETWORK_TYPE: "networkUri"
- NETWORKING: $${"projects/" + args.TARGET_PROJECT_ID + "/global/networks/default"}
next: check_create_target_entry_group
- check_create_target_entry_group:
switch:
- condition: $${args.CREATE_TARGET_ENTRY_GROUP == true}
next: create_target_entry_group
- condition: $${args.CREATE_TARGET_ENTRY_GROUP == false}
next: generate_extract_job_link
- create_target_entry_group:
call: http.post
args:
url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups?entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
next: generate_extract_job_link
- generate_extract_job_link:
call: sys.log
args:
data: $${"https://console.cloud.google.com/dataproc/batches/" + args.CLOUD_REGION + "/" + WORKFLOW_ID + "/monitoring?project=" + args.TARGET_PROJECT_ID}
severity: "INFO"
next: submit_pyspark_extract_job
- submit_pyspark_extract_job:
call: http.post
args:
url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches"}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
headers:
Content-Type: "application/json"
query:
batchId: $${WORKFLOW_ID}
body:
pysparkBatch:
mainPythonFileUri: file:///main.py
args:
- $${"--target_project_id=" + args.TARGET_PROJECT_ID}
- $${"--target_location_id=" + args.CLOUD_REGION}
- $${"--target_entry_group_id=" + args.TARGET_ENTRY_GROUP_ID}
- $${"--output_bucket=" + args.CLOUD_STORAGE_BUCKET_ID}
- $${"--output_folder=" + WORKFLOW_ID}
- $${args.ADDITIONAL_CONNECTOR_ARGS}
runtimeConfig:
containerImage: $${args.CUSTOM_CONTAINER_IMAGE}
environmentConfig:
executionConfig:
serviceAccount: $${args.SERVICE_ACCOUNT}
stagingBucket: $${args.CLOUD_STORAGE_BUCKET_ID}
$${NETWORK_TYPE}: $${NETWORKING}
networkTags: $${NETWORK_TAGS}
result: RESPONSE_MESSAGE
next: check_pyspark_extract_job
- check_pyspark_extract_job:
call: http.get
args:
url: $${"https://dataproc.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/batches/" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
result: PYSPARK_EXTRACT_JOB_STATUS
next: check_pyspark_extract_job_done
- check_pyspark_extract_job_done:
switch:
- condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "SUCCEEDED"}
next: generate_import_logs_link
- condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "CANCELLED"}
raise: $${PYSPARK_EXTRACT_JOB_STATUS}
- condition: $${PYSPARK_EXTRACT_JOB_STATUS.body.state == "FAILED"}
raise: $${PYSPARK_EXTRACT_JOB_STATUS}
next: pyspark_extract_job_wait
- pyspark_extract_job_wait:
call: sys.sleep
args:
seconds: 30
next: check_pyspark_extract_job
- generate_import_logs_link:
call: sys.log
args:
data: $${"https://console.cloud.google.com/logs/query?project=" + args.TARGET_PROJECT_ID + "&query=resource.type%3D%22dataplex.googleapis.com%2FMetadataJob%22+AND+resource.labels.location%3D%22" + args.CLOUD_REGION + "%22+AND+resource.labels.metadata_job_id%3D%22" + WORKFLOW_ID + "%22"}
severity: "INFO"
next: submit_import_job
- submit_import_job:
call: http.post
args:
url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs?metadata_job_id=" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
body:
type: IMPORT
import_spec:
source_storage_uri: $${"gs://" + args.CLOUD_STORAGE_BUCKET_ID + "/" + WORKFLOW_ID + "/"}
entry_sync_mode: FULL
aspect_sync_mode: INCREMENTAL
log_level: $${default(map.get(args, "IMPORT_JOB_LOG_LEVEL"), "INFO")}
scope:
entry_groups:
- $${"projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/entryGroups/" + args.TARGET_ENTRY_GROUP_ID}
entry_types: $${args.IMPORT_JOB_SCOPE_ENTRY_TYPES}
aspect_types: $${args.IMPORT_JOB_SCOPE_ASPECT_TYPES}
result: IMPORT_JOB_RESPONSE
next: get_job_start_time
- get_job_start_time:
assign:
- importJobStartTime: $${sys.now()}
next: import_job_startup_wait
- import_job_startup_wait:
call: sys.sleep
args:
seconds: 30
next: initial_get_import_job
- initial_get_import_job:
call: http.get
args:
url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
result: IMPORT_JOB_STATUS
next: check_import_job_status_available
- check_import_job_status_available:
switch:
- condition: $${sys.now() - importJobStartTime > 300} # 5 minutes = 300 seconds
next: kill_import_job
- condition: $${"status" in IMPORT_JOB_STATUS.body}
next: check_import_job_done
next: import_job_status_wait
- import_job_status_wait:
call: sys.sleep
args:
seconds: 30
next: check_import_job_status_available
- check_import_job_done:
switch:
- condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED"}
next: the_end
- condition: $${IMPORT_JOB_STATUS.body.status.state == "CANCELLED"}
raise: $${IMPORT_JOB_STATUS}
- condition: $${IMPORT_JOB_STATUS.body.status.state == "SUCCEEDED_WITH_ERRORS"}
raise: $${IMPORT_JOB_STATUS}
- condition: $${IMPORT_JOB_STATUS.body.status.state == "FAILED"}
raise: $${IMPORT_JOB_STATUS}
- condition: $${sys.now() - importJobStartTime > 43200} # 12 hours = 43200 seconds
next: kill_import_job
next: import_job_wait
- get_import_job:
call: http.get
args:
url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
result: IMPORT_JOB_STATUS
next: check_import_job_done
- import_job_wait:
call: sys.sleep
args:
seconds: 30
next: get_import_job
- kill_import_job:
call: http.post
args:
url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID + ":cancel"}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
next: get_killed_import_job
- get_killed_import_job:
call: http.get
args:
url: $${"https://dataplex.googleapis.com/v1/projects/" + args.TARGET_PROJECT_ID + "/locations/" + args.CLOUD_REGION + "/metadataJobs/" + WORKFLOW_ID}
auth:
type: OAuth2
scopes: "https://www.googleapis.com/auth/cloud-platform"
result: KILLED_IMPORT_JOB_STATUS
next: killed
- killed:
raise: $${KILLED_IMPORT_JOB_STATUS}
- the_end:
return: $${IMPORT_JOB_STATUS}
EOF