in tensorflow/inference/docker/build_artifacts/sagemaker_neuron/python_service.py [0:0]
def _handle_load_model_post(self, res, data): # noqa: C901
model_name = data["model_name"]
base_path = data["url"]
# model is already loaded
if model_name in self._model_tfs_pid:
res.status = falcon.HTTP_409
res.body = json.dumps({"error": "Model {} is already loaded.".format(model_name)})
# check if there are available ports
if not self._ports_available():
res.status = falcon.HTTP_507
res.body = json.dumps(
{"error": "Memory exhausted: no available ports to load the model."}
)
with lock():
self._model_tfs_rest_port[model_name] = self._tfs_ports["rest_port"].pop()
self._model_tfs_grpc_port[model_name] = self._tfs_ports["grpc_port"].pop()
# validate model files are in the specified base_path
if self.validate_model_dir(base_path):
try:
self._import_custom_modules(model_name)
tfs_config = tfs_utils.create_tfs_config_individual_model(model_name, base_path)
tfs_config_file = "/sagemaker/tfs-config/{}/model-config.cfg".format(model_name)
log.info("tensorflow serving model config: \n%s\n", tfs_config)
os.makedirs(os.path.dirname(tfs_config_file))
with open(tfs_config_file, "w", encoding="utf8") as f:
f.write(tfs_config)
batching_config_file = "/sagemaker/batching/{}/batching-config.cfg".format(
model_name
)
if self._tfs_enable_batching:
tfs_utils.create_batching_config(batching_config_file)
cmd = tfs_utils.tfs_command(
self._model_tfs_grpc_port[model_name],
self._model_tfs_rest_port[model_name],
tfs_config_file,
self._tfs_enable_batching,
batching_config_file,
tfs_intra_op_parallelism=self._tfs_intra_op_parallelism,
tfs_inter_op_parallelism=self._tfs_inter_op_parallelism,
)
log.info("MME starts tensorflow serving with command: {}".format(cmd))
p = subprocess.Popen(cmd.split())
tfs_utils.wait_for_model(
self._model_tfs_rest_port[model_name], model_name, self._tfs_wait_time_seconds
)
log.info("started tensorflow serving (pid: %d)", p.pid)
# update model name <-> tfs pid map
self._model_tfs_pid[model_name] = p
res.status = falcon.HTTP_200
res.body = json.dumps(
{
"success": "Successfully loaded model {}, "
"listening on rest port {} "
"and grpc port {}.".format(
model_name,
self._model_tfs_rest_port,
self._model_tfs_grpc_port,
)
}
)
except MultiModelException as multi_model_exception:
self._cleanup_config_file(tfs_config_file)
self._cleanup_config_file(batching_config_file)
if multi_model_exception.code == 409:
res.status = falcon.HTTP_409
res.body = multi_model_exception.msg
elif multi_model_exception.code == 408:
res.status = falcon.HTTP_408
res.body = multi_model_exception.msg
else:
raise MultiModelException(falcon.HTTP_500, multi_model_exception.msg)
except FileExistsError as e:
res.status = falcon.HTTP_409
res.body = json.dumps(
{"error": "Model {} is already loaded. {}".format(model_name, str(e))}
)
except OSError as os_error:
self._cleanup_config_file(tfs_config_file)
self._cleanup_config_file(batching_config_file)
if os_error.errno == 12:
raise MultiModelException(
falcon.HTTP_507,
"Memory exhausted: " "not enough memory to start TFS instance",
)
else:
raise MultiModelException(falcon.HTTP_500, os_error.strerror)
else:
res.status = falcon.HTTP_404
res.body = json.dumps(
{
"error": "Could not find valid base path {} for servable {}".format(
base_path, model_name
)
}
)