def get_yaml_data()

in clouddq-migration/dataplex.py [0:0]


def get_yaml_data(source_project, source_region, lake_id, task_id):
    '''
        Method to get the data quality yaml spec
    '''
    # Create a client
    client = dataplex_v1.DataplexServiceClient()

    # Initialize request argument(s)
    request = dataplex_v1.GetTaskRequest(
        name=f"projects/{source_project}/locations/{source_region}/lakes/{lake_id}/tasks/{task_id}",
    )

    # Make the request
    response = client.get_task(request=request)

    file_uri = response.spark.file_uris[-1]
    bucket_name = file_uri.split('/')[-2]
    file_name = file_uri.split('/')[-1]

    if not check_bucket_permission(bucket_name):
        raise PermissionError(f"Permission is denied on the bucket '{bucket_name}'.")

    if file_uri.endswith('.zip'):
        yaml_data = unzip_and_read_yaml(bucket_name, file_name)
    else:
        yaml_data = read_yaml_file(bucket_name, file_name)

    trigger_spec = response.trigger_spec
    return yaml_data, trigger_spec