tools/cloud/iot-topic-subscribe.py (77 lines of code) (raw):

#!/usr/bin/env python3 # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import argparse import json import time from concurrent.futures import Future import boto3 from awscrt import auth, mqtt5 from awsiot import mqtt5_client_builder from botocore.config import Config received_data = None def on_publish_received(message): global received_data packet = message.publish_packet print(f"Received message on topic: {packet.topic}") try: # Parse the message payload and extract the relevant data data = json.loads(packet.payload) if received_data is None: print(f"Received data: {json.dumps(data, indent=2)}") else: received_data.append(data) except Exception as e: print(f"Error parsing message: {e}") def main(): parser = argparse.ArgumentParser(description="Receives data from the IoT topic") parser.add_argument("--endpoint-url", metavar="URL", help="IoT Core endpoint URL", default=None) parser.add_argument("--region", metavar="REGION", help="IoT Core region", default="us-east-1") parser.add_argument("--client-id", metavar="ID", help="IoT Core region", default="CUSTOMER_APP") parser.add_argument("--run-time", metavar="SEC", help="Run time, zero is infinite", default="0") parser.add_argument("--output-file", metavar="FILE", help="Output JSON file", default=None) parser.add_argument("--iot-topic", metavar="IOT_TOPIC", help="MQTT topic", required=True) parser.add_argument("--vehicle-name", metavar="NAME", help="Vehicle name", required=True) args = parser.parse_args() if args.output_file: global received_data received_data = [] session = boto3.Session() iot_client = session.client( "iot", endpoint_url=args.endpoint_url, config=Config(region_name=args.region) ) iot_endpoint = iot_client.describe_endpoint(endpointType="iot:Data-ATS")["endpointAddress"] credentials_provider = auth.AwsCredentialsProvider.new_default_chain() stop_future = Future() def on_lifecycle_stopped(data: mqtt5.LifecycleStoppedData): print(f"MQTT client stopped {data=}") stop_future.set_result(None) mqtt_connection = mqtt5_client_builder.websockets_with_default_aws_signing( endpoint=iot_endpoint, region=args.region, on_publish_received=on_publish_received, on_lifecycle_stopped=on_lifecycle_stopped, credentials_provider=credentials_provider, client_id=args.client_id, clean_session=False, keep_alive_secs=30, ) mqtt_connection.start() mqtt_topic = f"{args.iot_topic}" subscribe_future = mqtt_connection.subscribe( subscribe_packet=mqtt5.SubscribePacket( subscriptions=[mqtt5.Subscription(topic_filter=mqtt_topic, qos=mqtt5.QoS.AT_LEAST_ONCE)] ) ) subscribe_res = subscribe_future.result(1000) print(f"Established mqtt subscription to {mqtt_topic} with {subscribe_res.reason_codes}") run_time = 0 try: while int(args.run_time) == 0 or run_time < int(args.run_time): time.sleep(1) run_time += 1 except KeyboardInterrupt: pass mqtt_connection.stop() stop_future.result(timeout=10) if args.output_file: with open(args.output_file, "w") as fp: fp.write(json.dumps(received_data, indent=2)) if __name__ == "__main__": main()