in clouddq-migration/dataplex.py [0:0]
def get_yaml_data(source_project, source_region, lake_id, task_id):
'''
Method to get the data quality yaml spec
'''
# Create a client
client = dataplex_v1.DataplexServiceClient()
# Initialize request argument(s)
request = dataplex_v1.GetTaskRequest(
name=f"projects/{source_project}/locations/{source_region}/lakes/{lake_id}/tasks/{task_id}",
)
# Make the request
response = client.get_task(request=request)
file_uri = response.spark.file_uris[-1]
bucket_name = file_uri.split('/')[-2]
file_name = file_uri.split('/')[-1]
if not check_bucket_permission(bucket_name):
raise PermissionError(f"Permission is denied on the bucket '{bucket_name}'.")
if file_uri.endswith('.zip'):
yaml_data = unzip_and_read_yaml(bucket_name, file_name)
else:
yaml_data = read_yaml_file(bucket_name, file_name)
trigger_spec = response.trigger_spec
return yaml_data, trigger_spec