decisionai_plugin/common/util/meta.py (101 lines of code) (raw):

import time from os import environ import json from .azureblob import AzureBlob from .azuretable import AzureTable from .constant import STATUS_SUCCESS, STATUS_FAIL from .constant import ModelState from .constant import AZURE_STORAGE_ACCOUNT, AZURE_STORAGE_TABLE_KEY, AZURE_STORAGE_ACCOUNT_DOMAIN from telemetry import log from .monitor import thumbprint import zlib import base64 def get_azure_table(): return AzureTable(AZURE_STORAGE_ACCOUNT, account_key=AZURE_STORAGE_TABLE_KEY, account_domain=AZURE_STORAGE_ACCOUNT_DOMAIN) def insert_or_update_meta(config, subscription, model_id, meta): azure_table = get_azure_table() if not azure_table.exists_table(config.az_tsana_meta_table): azure_table.create_table_if_not_exists(config.az_tsana_meta_table) origin_meta = get_meta(config, subscription, model_id) azure_table.insert_or_replace_entity(config.az_tsana_meta_table, subscription, model_id, group_id=meta['groupId'], app_id=meta['instance']['appId'], app_name=meta['instance']['appName'], series_set=base64.b64encode(zlib.compress(json.dumps(meta['seriesSets']).encode('utf-8'))).decode("ascii"), inst_name=meta['instance']['instanceName'], inst_id=meta['instance']['instanceId'], para=json.dumps(meta['instance']['params']), state=ModelState.Pending.name, context='' if origin_meta is None else origin_meta['context'], last_error='', ctime=time.time() if origin_meta is None else origin_meta['ctime'], mtime=time.time(), owner=thumbprint) # Get a model entity from meta # Parameters: # config: a dict object which should include AZ_META_TABLE, AZ_MONITOR_TABLE, TSANA_APP_NAME, TRAINING_OWNER_LIFE # subscription: a subscription is a name to differenciate a user, could be used for Authorization # model_id: The UUID for the model created # Return: # meta: a Dict object which includes all the column of an model entity def get_meta(config, subscription, model_id): try: azure_table = get_azure_table() if not azure_table.exists_table(config.az_tsana_meta_table): raise Exception('Meta table not exists') entity = azure_table.get_entity(config.az_tsana_meta_table, subscription, model_id) return entity except Exception as e: log.error("Get entity error from %s with model_id %s and subscription %s, exception: %s." % (config.az_tsana_meta_table, model_id, subscription, str(e))) return None # Update a model entity # Parameters: # config: a dict object which should include AZ_META_TABLE, AZ_MONITOR_TABLE, TSANA_APP_NAME, TRAINING_OWNER_LIFE # subscription: a subscription is a name to differenciate a user, could be used for Authorization # model_id: The UUID for the model created # state: model state # Return: # result: STATUS_SUCCESS / STATUS_FAIL # message: description for the result def update_state(config, subscription, model_id, state:ModelState=None, context:str=None, last_error:str=None): azure_table = get_azure_table() meta = get_meta(config, subscription, model_id) if meta == None or meta['state'] == ModelState.Deleted.name: return STATUS_FAIL, 'Model is not found!' if state: meta['state'] = state.name if context: meta['context'] = context if last_error: meta['last_error'] = last_error else: meta['last_error'] = '' meta['mtime'] = time.time() etag = azure_table.insert_or_replace_entity2(config.az_tsana_meta_table, meta) log.info("Insert or replace %s to table %s, state: %s, context: %s, last_error: %s, result: %s." % (model_id, config.az_tsana_meta_table, state.name if state else '', context if context else '', last_error if last_error else '', etag)) return STATUS_SUCCESS, '' def get_model_list(config, subscription): models = [] azure_table = get_azure_table() if not azure_table.exists_table(config.az_tsana_meta_table): return models entities = azure_table.get_entities(config.az_tsana_meta_table, subscription) for entity in entities.items: if 'RowKey' in entity and entity['RowKey']: #entity = clear_state_when_necessary(config, subscription, entity['RowKey'], entity) models.append(dict(modelId=entity['RowKey'], groupId=entity['group_id'], appId=entity['app_id'], appName=entity['app_name'], instanceName=entity['inst_name'], instanceId=entity['inst_id'], state=entity['state'] if 'state' in entity else '', ctime=entity['ctime'] if 'ctime' in entity else '', mtime=entity['mtime'] if 'mtime' in entity else '', owner=entity['owner'] if 'owner' in entity else '')) return models # Make sure there is no a dead process is owning the training # Parameters: # config: a dict object which should include AZ_META_TABLE, AZ_MONITOR_TABLE, TSANA_APP_NAME, TRAINING_OWNER_LIFE # subscription: a subscription is a name to differenciate a user, could be used for Authorization # model_id: The UUID for the model created # entity: model entity # Return: # entity: a entity with a correct state def clear_state_when_necessary(config, subscription, model_id, entity): if 'state' in entity and (entity['state'] == ModelState.Training.name or entity['state'] == ModelState.Pending.name): azure_table = get_azure_table() if not azure_table.exists_table(config.az_tsana_moniter_table): return entity # Find the training owner in the monitor table and make sure it is alive if 'owner' in entity and entity['owner']: try: monitor_entity = azure_table.get_entity(config.az_tsana_moniter_table, config.tsana_app_name, entity['owner']) except: monitor_entity = None else: monitor_entity = None now = time.time() # Problem is server time sync if monitor_entity is None or (now - float(monitor_entity['ping']) > config.training_owner_life): # The owner is dead, then # Fix the state state = ModelState.Failed last_error = 'Training job dead.' entity['state'] = state.name entity['last_error'] = last_error update_state(config, subscription, model_id, state, None, last_error) return entity