tutorials/tensorflow/mlflow_gcp/trainer/model_deployment.py (106 lines of code) (raw):
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Deploy a model in AI Platform."""
import logging
import json
import time
import subprocess
from googleapiclient import discovery
from googleapiclient import errors
_WAIT_FOR_COMPLETION_SLEEP_SECONDS = 10
_PYTHON_VERSION = '3.5'
_RUN_TIME_VERSION = '1.15'
def _create_service():
"""Gets service instance to start API searches.
:return:
"""
return discovery.build('ml', 'v1')
def copy_artifacts(source_path, destination_path):
"""
:param source_path:
:param destination_path:
:return:
"""
logging.info(
'Moving model directory from {} to {}'.format(source_path,
destination_path))
subprocess.call(
"gsutil -m cp -r {} {}".format(source_path, destination_path),
shell=True)
class AIPlatformModel(object):
def __init__(self, project_id):
self._project_id = project_id
self._service = _create_service()
def model_exists(self, model_name):
"""
:param model_name:
:return:
"""
models = self._service.projects().models()
try:
response = models.list(
parent='projects/{}'.format(self._project_id)).execute()
if response:
for model in response['models']:
if model['name'].rsplit('/', 1)[1] == model_name:
return True
else:
return False
except errors.HttpError as err:
logging.error('%s', json.loads(err.content)['error']['message'])
def _list_model_versions(self, model_name):
"""Lists existing model versions in the project.
Args:
model_name: Model name to list versions for.
Returns:
Dictionary of model versions.
"""
versions = self._service.projects().models().versions()
try:
return versions.list(
parent='projects/{}/models/{}'.format(self._project_id,
model_name)).execute()
except errors.HttpError as err:
logging.error('%s', json.loads(err.content)['error']['message'])
def create_model(self, model_name, model_region='us-central1'):
"""
:param model_name:
:param model_region:
:return:
"""
if not self.model_exists(model_name):
body = {
'name': model_name,
'regions': model_region,
'description': 'MLflow model'
}
parent = 'projects/{}'.format(self._project_id)
try:
self._service.projects().models().create(
parent=parent, body=body).execute()
logging.info('Model "%s" has been created.', model_name)
except errors.HttpError as err:
logging.error('"%s". Skipping model creation.',
json.loads(err.content)['error']['message'])
else:
logging.warning('Model "%s" already exists.', model_name)
def deploy_model(self, bucket_name, model_name, model_version,
runtime_version=_RUN_TIME_VERSION):
"""Deploys model on AI Platform.
Args:
bucket_name: Cloud Storage Bucket name that stores saved model.
model_name: Model name to deploy.
model_version: Model version.
runtime_version: Runtime version.
Raises:
RuntimeError if deployment completes with errors.
"""
# For details on request body, refer to:
# https://cloud.google.com/ml-engine/reference/rest/v1/projects
# .models.versions/create
model_version_exists = False
model_versions_list = self._list_model_versions(model_name)
# Field: version.name Error: A name should start with a letter and
# contain only letters, numbers and underscores
model_version = 'mlflow_{}'.format(model_version)
if model_versions_list:
for version in model_versions_list['versions']:
if version['name'].rsplit('/', 1)[1] == model_version:
model_version_exists = True
if not model_version_exists:
request_body = {
'name': model_version,
'deploymentUri': '{}'.format(bucket_name),
'framework': 'TENSORFLOW',
'runtimeVersion': runtime_version,
'pythonVersion': _PYTHON_VERSION
}
parent = 'projects/{}/models/{}'.format(self._project_id,
model_name)
response = self._service.projects().models().versions().create(
parent=parent, body=request_body).execute()
op_name = response['name']
while True:
deploy_status = (
self._service.projects().operations().get(
name=op_name).execute())
if deploy_status.get('done'):
logging.info('Model "%s" with version "%s" deployed.',
model_name,
model_version)
break
if deploy_status.get('error'):
logging.error(deploy_status['error'])
raise RuntimeError(
'Failed to deploy model for serving: {}'.format(
deploy_status['error']))
logging.info(
'Waiting for %d seconds for "%s" with "%s" version to be '
'deployed.',
_WAIT_FOR_COMPLETION_SLEEP_SECONDS, model_name,
model_version)
time.sleep(_WAIT_FOR_COMPLETION_SLEEP_SECONDS)
else:
logging.info('Model "%s" with version "%s" already exists.',
model_name,
model_version)