in src/influxDBTelemetryPublisher.py [0:0]
def retrieve_influxdb_params(publish_topic, subscribe_topic) -> str:
"""
Subscribe to a token response topic and send a request to the token request topic
in order to retrieve InfluxDB parameters.
Parameters
----------
publish_topic(str): the topic to publish the request on
subscribe_topic(str): the topic to subscribe on to retrieve the response
Returns
-------
influxdb_parameters(str): the retrieved parameters needed to connect to InfluxDB
"""
# First, set up a subscription to the InfluxDB token response topic
ipc_subscriber_client = awsiot.greengrasscoreipc.connect()
request = SubscribeToTopicRequest()
request.topic = subscribe_topic
handler = streamHandlers.InfluxDBDataStreamHandler()
subscriber_operation = ipc_subscriber_client.new_subscribe_to_topic(handler)
future = subscriber_operation.activate(request)
try:
future.result(TIMEOUT)
logging.info('Successfully subscribed to topic: {}'.format(subscribe_topic))
except concurrent.futures.TimeoutError as e:
logging.error('Timeout occurred while subscribing to topic: {}'.format(subscribe_topic), exc_info=True)
raise e
except UnauthorizedError as e:
logging.error('Unauthorized error while subscribing to topic: {}'.format(subscribe_topic), exc_info=True)
raise e
except Exception as e:
logging.error('Exception while subscribing to topic: {}'.format(subscribe_topic), exc_info=True)
subscriber_operation.close()
raise e
# Next, send a publish request to the InfluxDB token request topic
ipc_publisher_client = awsiot.greengrasscoreipc.connect()
retries = 0
try:
# Retrieve the InfluxDB parameters to connect
# Retry 10 times or until we retrieve parameters with RW access
while not handler.influxdb_parameters and retries < 10:
logging.info("Publish attempt {}".format(retries))
publish_token_request(ipc_publisher_client, publish_topic)
logging.info('Successfully published token request to topic: {}'.format(publish_topic))
retries += 1
logging.info('Waiting for 10 seconds...')
time.sleep(10)
if handler.influxdb_parameters:
# This component should only accept tokens with RW access, and will reject others in case of conflict
if handler.influxdb_parameters['InfluxDBTokenAccessType'] != "RW":
logging.warning("Discarding retrieved token with incorrect access level {}"
.format(handler.influxdb_parameters['InfluxDBTokenAccessType']))
handler.influxdb_parameters = {}
except Exception:
logging.error("Received error while sending token publish request!", exc_info=True)
finally:
# Close the operations for the clients
subscriber_operation.close()
logging.info("Closed InfluxDB parameter response subscriber client")
if not handler.influxdb_parameters:
logging.error("Failed to retrieve InfluxDB parameters over IPC!")
exit(1)
logging.info("Successfully retrieved InfluxDB metadata and token!")
return handler.influxdb_parameters