in source/lambda/iot-dr-r53-health-check/iot-dr-r53-health-checker.py [0:0]
def lambda_handler(event, context):
logger.info('r53-health-check: start')
logger.info('event: {}'.format(event))
try:
if COUNT < 1:
raise Exception('COUNT must be greate or equal 1: defined: {}'.format(COUNT))
uuid = '{}'.format(uuid4())
client_id = '{}-{}'.format(CLIENT_ID, uuid)
topic = 'dr/r53/check/{}/{}'.format(CLIENT_ID, uuid)
logger.info('client_id: {} topic: {}'.format(client_id, topic))
if not 'queryStringParameters' in event:
logger.error('queryStringParameters missing')
return {
'statusCode': 503,
'body': json.dumps({ 'message': 'internal server error'})
}
if not 'hashme' in event['queryStringParameters']:
logger.error('hashme missing')
return {
'statusCode': 503,
'body': json.dumps({ 'message': 'internal server error'})
}
if event['queryStringParameters']['hashme'] != QUERY_STRING:
logger.error('query string missmatch: rawQueryString: {}'.format(event['queryStringParameters']['hashme']))
return {
'statusCode': 503,
'body': json.dumps({ 'message': 'internal server error'})
}
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=ENDPOINT,
cert_filepath=CERT,
pri_key_filepath=KEY,
client_bootstrap=client_bootstrap,
ca_filepath=CA,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=client_id,
clean_session=False,
keep_alive_secs=6)
logger.info("connecting: endpoint: {} client_id: {}".format(
ENDPOINT, client_id))
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
logger.info("connected to endpoint: {}".format(ENDPOINT))
# Subscribe
logger.info("subscribing: topic: {}".format(topic))
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)
subscribe_result = subscribe_future.result()
logger.info("subscribed: qos: {}".format(str(subscribe_result['qos'])))
logger.info("sending {} message(s)".format(COUNT))
publish_count = 1
while (publish_count <= COUNT):
message = {
"message": "R53 health check",
"count": "{}".format(publish_count),
"datetime": "{}".format(datetime.now().isoformat())
}
if 'requestContext' in event and 'http' in event['requestContext'] and 'sourceIp' in event['requestContext']['http']:
message['source_ip'] = {'source': 'http', 'ip': event['requestContext']['http']['sourceIp']}
if 'requestContext' in event and 'identity' in event['requestContext'] and 'sourceIp' in event['requestContext']['identity']:
message['source_ip'] = {'source': 'identity', 'ip': event['requestContext']['identity']['sourceIp']}
logger.info("publishing: topic {}: message: {}".format(topic, message))
mqtt_connection.publish(
topic=topic,
payload=json.dumps(message),
qos=mqtt.QoS.AT_LEAST_ONCE)
#time.sleep(1)
publish_count += 1
# Wait for all messages to be received.
# This waits forever if count was set to 0.
if not RECEIVED_ALL_EVENT.is_set():
logger.info("waiting for all message(s) to be received: {}/{}".format(RECEIVED_COUNT, COUNT))
if not RECEIVED_ALL_EVENT.wait(RECEIVE_TIMEOUT):
raise Exception('not all message received after timeout: received: {} expected: {} timeout: {}'.format(
RECEIVED_COUNT, COUNT, RECEIVE_TIMEOUT))
logger.info("message(s) received: {}/{}".format(RECEIVED_COUNT, COUNT))
# Disconnect
logger.info("initiating disconnect")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
logger.info("disconnected")
logger.info('r53-health-check: finished: messages: published/received: {}/{}'.format(
publish_count-1, RECEIVED_COUNT))
return {
'statusCode': 200,
'body': json.dumps({ 'mqtt_status': 'healthy' })
}
except Exception as e:
logger.error('r53-health-check: finished: with errror: {}'.format(e))
return {
'statusCode': 503,
'body': json.dumps({ 'mqtt_status': 'unhealthy', 'error': '{}'.format(e)})
}