in GenAIFlow.py [0:0]
def load_secret(self, secret_id: str) -> str:
"""
Utility function to load a secret from GCP
"""
client = secretmanager.SecretManagerServiceClient()
secret_path = client.secret_version_path(self.get_project_id(), secret_id, "latest")
response = client.access_secret_version(request={"name": secret_path})
crc32c = google_crc32c.Checksum()
crc32c.update(response.payload.data)
if response.payload.data_crc32c != int(crc32c.hexdigest(), 16):
raise Exception(f"Secret CRC Corrupted in project {self.get_project_id()} and path {secret_path}")
return response.payload.data.decode("UTF-8")