def __process_job__()

in lab/03-Package-Deploy/iot-jobs/app/ota.py [0:0]


    def __process_job__(self, job_id, msg):
        self.processing_lock.acquire()
        if job_id in self.processed_jobs:
            self.processing_lock.release()
            return
        self.processed_jobs.append(job_id)
        try:
            if msg.get('type') == 'new_model':                
                model_version = float(msg['model_version'])
                model_name = msg['model_name']

                if self.model_meta.get('model_name') is not None:
                    if self.model_meta['model_name'] != model_name:
                        msg = 'New model name doesnt match the current name: %s' % model_name
                        logging.info(msg)
                        self.__update_job_status__(job_id, 'FAILED', msg)
                        self.processing_lock.release()
                        return
                        
                    if self.model_meta['model_version'] >= model_version:
                        msg = "New model version is not newer than the current one. Curr: %f; New: %f;" % (self.model_meta['model_version'], model_version)
                        logging.info(msg)
                        self.__update_job_status__(job_id, 'FAILED', msg)
                        self.processing_lock.release()
                        return

                logging.info("Downloading new model package")
                s3_client = self.get_client('s3')
                
                package = io.BytesIO(s3_client.get_object(
                    Bucket=msg['model_package_bucket'], 
                    Key=msg['model_package_key'])['Body'].read()
                )
                logging.info("Unpacking model package")
                with tarfile.open(fileobj=package) as p:
                    p.extractall(os.path.join(self.model_path, str(self.device_id), msg['model_name'], msg['model_version']))
                    self.model_meta['model_name'] = msg['model_name']
                    self.model_meta['model_version'] = model_version
                                
                self.__update_job_status__(job_id, 'SUCCEEDED', 'Model deployed')
                self.update_callback(self.device_id, self.model_meta['model_name'], self.model_meta['model_version'])                
            else:
                logging.info("Model '%s' version '%f' is the current one or it is obsolete" % (self.model_metadata['model_name'], self.model_metadata['model_version']))
        except Exception as e:
            self.__update_job_status__(job_id, 'FAILED', str(e))            
            logging.error(e)

        self.processing_lock.release()