in 04_EdgeApplication/turbine/ota.py [0:0]
def __init__(self, device_name, iot_params, mqtt_host, mqtt_port, update_callback, model_path):
'''
This class is responsible for listening to IoT topics and receiving
a Json document with the metadata of a new model. This module also
downloads the SageMaker Edge Manager deployment package, unpacks it to
a local dir and also controls versioning.
'''
if model_path is None or update_callback is None:
raise Exception("You need to inform a model_path and an update_callback methods")
self.device_name = device_name
self.model_path = model_path
self.update_callback = update_callback
self.iot_params = iot_params
## initialize an mqtt client
self.mqttc = mqtt.Client()
self.mqttc.tls_set(
iot_params['sagemaker_edge_provider_aws_ca_cert_file'],
certfile=iot_params['sagemaker_edge_provider_aws_cert_file'],
keyfile=iot_params['sagemaker_edge_provider_aws_cert_pk_file'],
cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None
)
self.mqttc.enable_logger(logger=logging)
self.mqttc.on_message = self.__on_message__
self.mqttc.on_connect = self.__on_connect__
self.mqttc.on_disconnect = self.__on_disconnect__
self.connected = False
self.model_meta = {'model_name': None}
for f in glob.glob(os.path.join(model_path, '*', '*', 'compiled.*')):
tokens = f.split(os.path.sep)
assert(len(tokens) > 3)
name = tokens[-3]
version = float(tokens[-2])
if self.model_meta['model_name'] != name or self.model_meta['model_version'] < version:
self.model_meta['model_name'] = name
self.model_meta['model_version'] = version
logging.info("Model meta", self.model_meta)
if self.model_meta['model_name'] is not None:
self.update_callback(self.model_meta['model_name'], self.model_meta['model_version'])
self.processing_lock = threading.Lock()
self.processed_jobs = []
# start the mqtt client
self.mqttc.connect(mqtt_host, mqtt_port, 45)
self.mqttc.loop_start()