decisionai_plugin/common/util/model.py (69 lines of code) (raw):
import time
from time import gmtime, strftime
import traceback
from werkzeug.utils import secure_filename
import re
import os
from os import environ
from os.path import basename
import sys
import logging
import shutil
import zipfile
import json
import uuid
from .azureblob import AzureBlob
from .azuretable import AzureTable
from .timeutil import get_time_offset, str_to_dt, dt_to_str
from .constant import TIMESTAMP, VALUE
from .constant import STATUS_SUCCESS, STATUS_FAIL
from .constant import AZURE_STORAGE_ACCOUNT, AZURE_STORAGE_ACCOUNT_KEY, AZURE_STORAGE_ACCOUNT_DOMAIN
# Copy the model file to local prd directory, zip and upload to AzureBlob
# The training output and prd directory is calculated by config with subscription/model_id
# Parameters:
# config: a dict object which should include MODEL_TMP_DIR, TSANA_APP_NAME
# subscription: the name of the user
# model_id: UUID for the model
# Return:
# result: STATE_SUCCESS / STATE_FAIL
# message: description of the result
def upload_model(config, subscription, model_id, model_dir):
try:
zip_dir = os.path.join(config.model_temp_dir, subscription + '_' + model_id + '_' + str(time.time()))
os.makedirs(zip_dir, exist_ok=True)
zip_file = os.path.join(zip_dir, "model.zip")
with zipfile.ZipFile(zip_file, "w") as zf:
src_files = os.listdir(model_dir)
for file_name in src_files:
full_file = os.path.join(model_dir, file_name)
if os.path.isfile(full_file):
zf.write(full_file, basename(full_file))
else:
full_subdir = os.path.join(model_dir, file_name)
for root, dirs, files in os.walk(full_subdir):
for fn in files:
absfn = os.path.join(root, fn)
print(absfn)
zfn = absfn[len(model_dir)+len(os.sep):]
zf.write(absfn, zfn)
zf.close()
container_name = config.tsana_app_name
azure_blob = AzureBlob(AZURE_STORAGE_ACCOUNT, AZURE_STORAGE_ACCOUNT_KEY, AZURE_STORAGE_ACCOUNT_DOMAIN)
azure_blob.create_container(container_name)
with open(zip_file, "rb") as data:
azure_blob.upload_blob(container_name, subscription + '_' + model_id, data)
return STATUS_SUCCESS, ''
except Exception as e:
return STATUS_FAIL, str(e)
finally:
shutil.rmtree(zip_dir, ignore_errors=True)
# Check if local prd directory contains up-to-date model, otherwise, try to download from blob
# Parameters:
# config: a dict object which should include MODEL_TMP_DIR, TSANA_APP_NAME
# subscription: the name of the user
# model_id: UUID for the model
# Return:
# result: STATE_SUCCESS / STATE_FAIL
# message: description of the result
def download_model(config, subscription, model_id, model_dir):
try:
zip_dir = os.path.join(config.model_temp_dir, subscription + '_' + model_id + '_' + str(time.time()))
os.makedirs(zip_dir, exist_ok=True)
# download from blob
container_name = config.tsana_app_name
azure_blob = AzureBlob(AZURE_STORAGE_ACCOUNT, AZURE_STORAGE_ACCOUNT_KEY, AZURE_STORAGE_ACCOUNT_DOMAIN)
azure_blob.create_container(container_name)
model_name = subscription + '_' + model_id
zip_file = os.path.join(zip_dir, "model.zip")
azure_blob.download_blob(container_name, model_name, zip_file)
with zipfile.ZipFile(zip_file) as zf:
shutil.rmtree(model_dir, ignore_errors=True)
os.makedirs(model_dir, exist_ok=True)
zf.extractall(path=model_dir)
return STATUS_SUCCESS, ''
except Exception as e:
return STATUS_FAIL, str(e)
finally:
shutil.rmtree(zip_dir, ignore_errors=True)