in source/tools/iot-dr-pubsub.py [0:0]
def connection_start(iot_endpoint):
global MQTT_CONNECTION
# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
if args.use_websocket == True:
proxy_options = None
if (args.proxy_host):
proxy_options = http.HttpProxyOptions(host_name=args.proxy_host, port=args.proxy_port)
credentials_provider = auth.AwsCredentialsProvider.new_default_chain(client_bootstrap)
MQTT_CONNECTION = mqtt_connection_builder.websockets_with_default_aws_signing(
endpoint=iot_endpoint,
client_bootstrap=client_bootstrap,
region=args.signing_region,
credentials_provider=credentials_provider,
websocket_proxy_options=proxy_options,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=False,
keep_alive_secs=6)
else:
MQTT_CONNECTION = mqtt_connection_builder.mtls_from_path(
endpoint=iot_endpoint,
cert_filepath=args.cert,
pri_key_filepath=args.key,
client_bootstrap=client_bootstrap,
ca_filepath=args.root_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=args.client_id,
clean_session=True,
keep_alive_secs=6)
logger.info("Connecting to %s with client ID '%s'...",
iot_endpoint, args.client_id)
connect_future = MQTT_CONNECTION.connect()
# Future.result() waits until a result is available
connect_future.result()
logger.info("Connected!")
# Subscribe
logger.info("Subscribing to topic '%s'", args.topic)
subscribe_future, packet_id = MQTT_CONNECTION.subscribe(
topic=args.topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)
subscribe_result = subscribe_future.result()
logger.info("Subscribed with %s", subscribe_result['qos'])