in workflows-generator/composer-templates/workflow.py [0:0]
def extract_job_params(job_name, function_name, encoding='utf-8'):
"""Extracts parameters from a JSON job file.
Args:
bucket_name: Bucket containing the JSON parameters file .
Returns:
A dictionary containing the extracted parameters.
"""
json_file_path = f'gs://{jobs_bucket}/{function_name}/{job_name}.json'
parts = json_file_path.replace("gs://", "").split("/")
bucket_name = parts[0]
object_name = "/".join(parts[1:])
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(object_name)
json_data = blob.download_as_bytes()
params = json.loads(json_data.decode(encoding))
return params