in lab/app/ota.py [0:0]
def __process_job__(self, job_id, msg):
self.processing_lock.acquire()
if job_id in self.processed_jobs:
self.processing_lock.release()
return
self.processed_jobs.append(job_id)
try:
if msg.get('type') == 'new_model':
model_version = float(msg['model_version'])
model_name = msg['model_name']
if self.model_meta.get('model_name') is not None:
if self.model_meta['model_name'] != model_name:
msg = 'New model name doesnt match the current name: %s' % model_name
logging.info(msg)
self.__update_job_status__(job_id, 'FAILED', msg)
self.processing_lock.release()
return
if self.model_meta['model_version'] >= model_version:
msg = "New model version is not newer than the current one. Curr: %f; New: %f;" % (self.model_meta['model_version'], model_version)
logging.info(msg)
self.__update_job_status__(job_id, 'FAILED', msg)
self.processing_lock.release()
return
logging.info("Downloading new model package")
s3_client = self.get_client('s3')
package = io.BytesIO(s3_client.get_object(
Bucket=msg['model_package_bucket'],
Key=msg['model_package_key'])['Body'].read()
)
logging.info("Unpacking model package")
with tarfile.open(fileobj=package) as p:
p.extractall(os.path.join(self.model_path, str(self.device_id), msg['model_name'], msg['model_version']))
self.model_meta['model_name'] = msg['model_name']
self.model_meta['model_version'] = model_version
self.__update_job_status__(job_id, 'SUCCEEDED', 'Model deployed')
self.update_callback(self.device_id, self.model_meta['model_name'], self.model_meta['model_version'])
else:
logging.info("Model '%s' version '%f' is the current one or it is obsolete" % (self.model_metadata['model_name'], self.model_metadata['model_version']))
except Exception as e:
self.__update_job_status__(job_id, 'FAILED', str(e))
logging.error(e)
self.processing_lock.release()