in src/retrieveInfluxDBSecrets.py [0:0]
def get_secret_over_ipc(secret_arn) -> str:
"""
Parse arguments.
Parameters
----------
secret_arn(str): The ARN of the secret to retrieve from Secret Manager.
Returns
-------
secret_string(str): Retrieved IPC secret.
"""
try:
ipc_client = awsiot.greengrasscoreipc.connect()
request = GetSecretValueRequest()
request.secret_id = secret_arn
operation = ipc_client.new_get_secret_value()
operation.activate(request)
futureResponse = operation.get_response()
try:
response = futureResponse.result(TIMEOUT)
return response.secret_value.secret_string
except concurrent.futures.TimeoutError as e:
logging.error("Timeout occurred while getting secret: {}".format(secret_arn), exc_info=True)
raise e
except UnauthorizedError as e:
logging.error("Unauthorized error while getting secret: {}".format(secret_arn), exc_info=True)
raise e
except Exception as e:
logging.error("Exception while getting secret: {}".format(secret_arn), exc_info=True)
raise e
except Exception:
logging.error("Exception occurred when using IPC.", exc_info=True)
exit(1)