tools/AduCmdlets-py/scripts/aduupdate.py (119 lines of code) (raw):
#!/usr/bin/env python3
#
# Device Update for IoT Hub
# Copyright (c) Microsoft Corporation.
#
# Python helpers for creating import manifest for Device Update for IoT Hub.
#
import base64
import datetime
import hashlib
import json
import os
import re
class update_id():
"""! Forward reference for update_id class. """
pass
class compatibility_info():
"""! Forward reference for compatibility_info class. """
pass
class installation_step():
"""! Forward reference for installation_step class. """
pass
def get_filehashes(file_path: str) -> dict:
"""! Gets file hashes for a file.
@param file_path Full path of file.
@return dict containing hashes. Currently only a single (sha256) hash is returned.
"""
if not os.path.isfile(file_path) and not os.path.exists('file_path'):
raise FileNotFoundError('File {file_path} not found')
with open(os.path.abspath(file_path), 'rb') as f:
bytes = f.read()
digest = hashlib.sha256(bytes).digest()
return {'sha256': base64.b64encode(digest).decode('ascii')}
def get_file_metadata(file_path: str) -> dict:
"""! Retrieves file metadata for a file.
@param file_path Full path of file
@return dict containing file metadata.
"""
return {
'filename': os.path.basename(file_path),
'sizeInBytes': os.path.getsize(file_path),
'hashes': get_filehashes(file_path)
}
class import_manifest:
"""! Class that represents an update import manifest """
def __init__(self, update_id: update_id, compatibility_infos: list[compatibility_info], installation_steps: list[installation_step], is_deployable: bool = True, description: str = None) -> None:
if description is not None and len(description) > 512:
raise ValueError(f'Description {description} is not valid')
if len(installation_steps) < 1 or len(installation_steps) > 10:
raise ValueError(
f'Invalid number of installation steps (must be 1-10)')
file_metadatas = []
for step in installation_steps:
if type(step) is inline_installation_step:
if len(step.files) < 1 or len(step.files) > 10:
raise ValueError(
'Invalid number of files specified for step (must be 1-10)')
for file in step.files:
# Note: step.files are full pathnames, not just filenames
meta = get_file_metadata(file_path=file)
basename = os.path.basename(file)
# In case multiple steps are sharing a payload file.
if not any(fm['filename'] == basename for fm in file_metadatas):
file_metadatas.append(meta)
if len(file_metadatas) > 10:
raise ValueError('Invalid number of payload files (must be <=10)')
self.import_manifest = {
'updateId': update_id.__dict__,
'isDeployable': is_deployable,
'compatibility': compatibility_infos,
'createdDateTime': datetime.datetime.utcnow().isoformat(),
'manifestVersion': '4.0'
}
self.import_manifest['instructions'] = {
"steps": [step.as_dict() for step in installation_steps]
}
if description is not None:
self.import_manifest['description'] = description
if len(file_metadatas) > 0:
self.import_manifest['files'] = file_metadatas
def __str__(self) -> str:
"""! Returns object stringified as JSON """
return json.dumps(self.import_manifest, indent=4)
class update_id:
"""!
Class that represents an update id.
An update id is a represented by { provider, name, version }, all strings.
"""
def __init__(self, provider: str, name: str, version: str) -> None:
# Provider
pattern = re.compile(r'[a-zA-Z0-9.-]{1,64}')
if not re.fullmatch(pattern, provider):
raise ValueError(f'provider {provider} is not valid')
self.provider = provider
# Name
pattern = re.compile(r'[a-zA-Z0-9.-]{1,64}')
if not re.fullmatch(pattern, name):
raise ValueError(f'name {name} is not valid')
self.name = name
# Version (major.minor[.build[.revision]])
pattern = re.compile(r'\d{1,5}\.\d{1,5}(?:\.\d{1,5}(?:\.\d{1,5})?)?')
if not re.fullmatch(pattern, version):
raise ValueError(f'version {version} is not valid')
self.version = version
def __str__(self) -> str:
"""! Returns object stringified in format provider.name.version. """
return f'{self.provider}.{self.name}.{self.version}'
class compatibility_info(dict):
"""! Class that represents an update compatibility. """
def __init__(self, *arg, **kw):
super(compatibility_info, self).__init__(*arg, **kw)
if len(self) > 5:
raise ValueError(
'Invalid number of compatibility info properties (must be 1-5)')
class installation_step:
"""! Base class for installation step. Should not be instantiated. """
class inline_installation_step(installation_step):
"""! Class that represents an inline installation step. """
def __init__(self, handler: str, files: list, handler_properties: dict = None, description: str = None) -> None:
self.handler = handler
self.files = files
self.handler_properties = handler_properties
self.description = description
def as_dict(self):
"""! Return object as dict. """
json = {
'type': 'inline',
'handler': self.handler
}
# Inline step requires only payload file name, convert full path to filename.
json['files'] = [os.path.basename(file) for file in self.files]
if self.handler_properties is not None:
json['handlerProperties'] = self.handler_properties
if self.description is not None:
json['description'] = self.description
return json
def __str__(self):
""" !Return object as str. """
return f'[{self.handler}; {self.files}; {self.handler_properties}; {self.description}]'
class reference_installation_step(installation_step):
"""! Class that represents a reference installation step. """
def __init__(self, update_id: update_id, description: str = None):
if description is not None and len(description) > 512:
raise ValueError(f'description {description} is not valid')
self.update_id = update_id
self.description = description
def as_dict(self):
"""! Return object as dict. """
json = {
'type': 'reference',
'updateId': self.update_id.__dict__
}
if self.description is not None:
json['description'] = self.description
return json
def __str__(self):
"""! Return object as str. """
return f'[{self.update_id}; {self.description}]'