in tensorflow_serving/model_servers/server.cc [191:449]
Status Server::BuildAndStart(const Options& server_options) {
if (server_options.grpc_port == 0 &&
server_options.grpc_socket_path.empty()) {
return errors::InvalidArgument(
"At least one of server_options.grpc_port or "
"server_options.grpc_socket_path must be set.");
}
if (server_options.use_alts_credentials &&
!server_options.ssl_config_file.empty()) {
return errors::InvalidArgument(
"Either use_alts_credentials must be false or "
"ssl_config_file must be empty.");
}
if (server_options.model_base_path.empty() &&
server_options.model_config_file.empty()) {
return errors::InvalidArgument(
"Both server_options.model_base_path and "
"server_options.model_config_file are empty!");
}
SetSignatureMethodNameCheckFeature(
server_options.enable_signature_method_name_check);
// For ServerCore Options, we leave servable_state_monitor_creator unspecified
// so the default servable_state_monitor_creator will be used.
ServerCore::Options options;
// model server config
if (server_options.model_config_file.empty()) {
options.model_server_config = BuildSingleModelConfig(
server_options.model_name, server_options.model_base_path);
} else {
TF_RETURN_IF_ERROR(ParseProtoTextFile<ModelServerConfig>(
server_options.model_config_file, &options.model_server_config));
}
if (server_options.platform_config_file.empty()) {
SessionBundleConfig session_bundle_config;
// Batching config
if (server_options.enable_batching) {
BatchingParameters* batching_parameters =
session_bundle_config.mutable_batching_parameters();
if (server_options.batching_parameters_file.empty()) {
batching_parameters->mutable_thread_pool_name()->set_value(
"model_server_batch_threads");
} else {
TF_RETURN_IF_ERROR(ParseProtoTextFile<BatchingParameters>(
server_options.batching_parameters_file, batching_parameters));
}
} else if (!server_options.batching_parameters_file.empty()) {
return errors::InvalidArgument(
"server_options.batching_parameters_file is set without setting "
"server_options.enable_batching to true.");
}
if (!server_options.tensorflow_session_config_file.empty()) {
TF_RETURN_IF_ERROR(
ParseProtoTextFile(server_options.tensorflow_session_config_file,
session_bundle_config.mutable_session_config()));
}
session_bundle_config.mutable_session_config()
->mutable_gpu_options()
->set_per_process_gpu_memory_fraction(
server_options.per_process_gpu_memory_fraction);
if (server_options.tensorflow_intra_op_parallelism > 0 &&
server_options.tensorflow_inter_op_parallelism > 0 &&
server_options.tensorflow_session_parallelism > 0){
return errors::InvalidArgument("Either configure "
"server_options.tensorflow_session_parallelism "
"or (server_options.tensorflow_intra_op_parallelism, "
"server_options.tensorflow_inter_op_parallelism) separately. "
"You cannot configure all.");
} else if (server_options.tensorflow_intra_op_parallelism > 0 ||
server_options.tensorflow_inter_op_parallelism > 0){
session_bundle_config.mutable_session_config()
->set_intra_op_parallelism_threads(
server_options.tensorflow_intra_op_parallelism);
session_bundle_config.mutable_session_config()
->set_inter_op_parallelism_threads(
server_options.tensorflow_inter_op_parallelism);
} else {
session_bundle_config.mutable_session_config()
->set_intra_op_parallelism_threads(
server_options.tensorflow_session_parallelism);
session_bundle_config.mutable_session_config()
->set_inter_op_parallelism_threads(
server_options.tensorflow_session_parallelism);
}
const std::vector<string> tags =
tensorflow::str_util::Split(server_options.saved_model_tags, ",");
for (const string& tag : tags) {
*session_bundle_config.add_saved_model_tags() = tag;
}
session_bundle_config.set_enable_model_warmup(
server_options.enable_model_warmup);
if (server_options.num_request_iterations_for_warmup > 0) {
session_bundle_config.mutable_model_warmup_options()
->mutable_num_request_iterations()
->set_value(server_options.num_request_iterations_for_warmup);
}
session_bundle_config.set_remove_unused_fields_from_bundle_metagraph(
server_options.remove_unused_fields_from_bundle_metagraph);
session_bundle_config.set_prefer_tflite_model(
server_options.prefer_tflite_model);
session_bundle_config.set_num_tflite_interpreters_per_pool(
server_options.num_tflite_interpreters_per_pool);
session_bundle_config.set_num_tflite_pools(server_options.num_tflite_pools);
options.platform_config_map =
CreateTensorFlowPlatformConfigMap(session_bundle_config);
} else {
TF_RETURN_IF_ERROR(ParseProtoTextFile<PlatformConfigMap>(
server_options.platform_config_file, &options.platform_config_map));
}
options.custom_model_config_loader = &LoadCustomModelConfig;
options.aspired_version_policy =
std::unique_ptr<AspiredVersionPolicy>(new AvailabilityPreservingPolicy);
options.num_load_threads = server_options.num_load_threads;
options.num_unload_threads = server_options.num_unload_threads;
options.max_num_load_retries = server_options.max_num_load_retries;
options.load_retry_interval_micros =
server_options.load_retry_interval_micros;
options.file_system_poll_wait_seconds =
server_options.file_system_poll_wait_seconds;
options.flush_filesystem_caches = server_options.flush_filesystem_caches;
options.allow_version_labels_for_unavailable_models =
server_options.allow_version_labels_for_unavailable_models;
options.force_allow_any_version_labels_for_unavailable_models =
server_options.force_allow_any_version_labels_for_unavailable_models;
options.enable_cors_support = server_options.enable_cors_support;
TF_RETURN_IF_ERROR(ServerCore::Create(std::move(options), &server_core_));
// Model config polling thread must be started after the call to
// ServerCore::Create() to prevent config reload being done concurrently from
// Create() and the poll thread.
if (server_options.fs_model_config_poll_wait_seconds > 0 &&
!server_options.model_config_file.empty()) {
PeriodicFunction::Options pf_options;
pf_options.thread_name_prefix = "Server_fs_model_config_poll_thread";
const string model_config_file = server_options.model_config_file;
fs_config_polling_thread_.reset(new PeriodicFunction(
[this, model_config_file] {
this->PollFilesystemAndReloadConfig(model_config_file);
},
server_options.fs_model_config_poll_wait_seconds *
tensorflow::EnvTime::kSecondsToMicros,
pf_options));
}
// 0.0.0.0" is the way to listen on localhost in gRPC.
const string server_address =
"0.0.0.0:" + std::to_string(server_options.grpc_port);
model_service_ = absl::make_unique<ModelServiceImpl>(server_core_.get());
PredictionServiceImpl::Options predict_server_options;
predict_server_options.server_core = server_core_.get();
predict_server_options.enforce_session_run_timeout =
server_options.enforce_session_run_timeout;
if (!server_options.thread_pool_factory_config_file.empty()) {
ThreadPoolFactoryConfig thread_pool_factory_config;
TF_RETURN_IF_ERROR(ParseProtoTextFile<ThreadPoolFactoryConfig>(
server_options.thread_pool_factory_config_file,
&thread_pool_factory_config));
TF_RETURN_IF_ERROR(ThreadPoolFactoryRegistry::CreateFromAny(
thread_pool_factory_config.thread_pool_factory_config(),
&thread_pool_factory_));
}
predict_server_options.thread_pool_factory = thread_pool_factory_.get();
prediction_service_ =
absl::make_unique<PredictionServiceImpl>(predict_server_options);
::grpc::ServerBuilder builder;
// If defined, listen to a tcp port for gRPC/HTTP.
if (server_options.grpc_port != 0) {
builder.AddListeningPort(
server_address,
BuildServerCredentials(server_options.use_alts_credentials,
server_options.ssl_config_file));
}
// If defined, listen to a UNIX socket for gRPC.
if (!server_options.grpc_socket_path.empty()) {
const string grpc_socket_uri = "unix:" + server_options.grpc_socket_path;
builder.AddListeningPort(
grpc_socket_uri,
BuildServerCredentials(server_options.use_alts_credentials,
server_options.ssl_config_file));
}
builder.RegisterService(model_service_.get());
builder.RegisterService(prediction_service_.get());
if (server_options.enable_profiler) {
profiler_service_ = tensorflow::profiler::CreateProfilerService();
builder.RegisterService(profiler_service_.get());
LOG(INFO) << "Profiler service is enabled";
}
builder.SetMaxMessageSize(tensorflow::kint32max);
const std::vector<GrpcChannelArgument> channel_arguments =
parseGrpcChannelArgs(server_options.grpc_channel_arguments);
for (const GrpcChannelArgument& channel_argument : channel_arguments) {
// gRPC accept arguments of two types, int and string. We will attempt to
// parse each arg as int and pass it on as such if successful. Otherwise we
// will pass it as a string. gRPC will log arguments that were not accepted.
tensorflow::int32 value;
if (tensorflow::strings::safe_strto32(channel_argument.value, &value)) {
builder.AddChannelArgument(channel_argument.key, value);
} else {
builder.AddChannelArgument(channel_argument.key, channel_argument.value);
}
}
::grpc::ResourceQuota res_quota;
res_quota.SetMaxThreads(server_options.grpc_max_threads);
builder.SetResourceQuota(res_quota);
grpc_server_ = builder.BuildAndStart();
if (grpc_server_ == nullptr) {
return errors::InvalidArgument("Failed to BuildAndStart gRPC server");
}
if (server_options.grpc_port != 0) {
LOG(INFO) << "Running gRPC ModelServer at " << server_address << " ...";
}
if (!server_options.grpc_socket_path.empty()) {
LOG(INFO) << "Running gRPC ModelServer at UNIX socket "
<< server_options.grpc_socket_path << " ...";
}
if (server_options.http_port != 0) {
if (server_options.http_port != server_options.grpc_port) {
const string server_address =
"localhost:" + std::to_string(server_options.http_port);
MonitoringConfig monitoring_config;
if (!server_options.monitoring_config_file.empty()) {
TF_RETURN_IF_ERROR(ParseProtoTextFile<MonitoringConfig>(
server_options.monitoring_config_file, &monitoring_config));
}
http_server_ = CreateAndStartHttpServer(
server_options.http_port, server_options.http_num_threads,
server_options.http_timeout_in_ms, monitoring_config,
server_core_.get());
if (http_server_ != nullptr) {
LOG(INFO) << "Exporting HTTP/REST API at:" << server_address << " ...";
} else {
LOG(ERROR) << "Failed to start HTTP Server at " << server_address;
}
} else {
LOG(ERROR) << "server_options.http_port cannot be same as grpc_port. "
<< "Please use a different port for HTTP/REST API. "
<< "Skipped exporting HTTP/REST API.";
}
}
return Status::OK();
}