in functions/data-processing-engines/dataflow-flextemplate-job-executor/main.py [0:0]
def extract_params(bucket_name, job_name, function_name, encoding='utf-8'):
"""Extracts parameters from a JSON file.
Args:
bucket_name: Bucket containing the JSON parameters file .
Returns:
A dictionary containing the extracted parameters.
"""
json_file_path = f'gs://{bucket_name}/{function_name}/{job_name}.json'
parts = json_file_path.replace("gs://", "").split("/")
bucket_name = parts[0]
object_name = "/".join(parts[1:])
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(object_name)
try:
json_data = blob.download_as_bytes()
params = json.loads(json_data.decode(encoding))
return params
except (google.cloud.exceptions.NotFound, json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Error reading JSON file: {e}")
return None