Status Server::BuildAndStart()

in tensorflow_serving/model_servers/server.cc [191:449]


Status Server::BuildAndStart(const Options& server_options) {
  if (server_options.grpc_port == 0 &&
      server_options.grpc_socket_path.empty()) {
    return errors::InvalidArgument(
      "At least one of server_options.grpc_port or "
      "server_options.grpc_socket_path must be set.");
  }

  if (server_options.use_alts_credentials &&
      !server_options.ssl_config_file.empty()) {
    return errors::InvalidArgument(
        "Either use_alts_credentials must be false or "
        "ssl_config_file must be empty.");
  }

  if (server_options.model_base_path.empty() &&
      server_options.model_config_file.empty()) {
    return errors::InvalidArgument(
        "Both server_options.model_base_path and "
        "server_options.model_config_file are empty!");
  }

  SetSignatureMethodNameCheckFeature(
      server_options.enable_signature_method_name_check);

  // For ServerCore Options, we leave servable_state_monitor_creator unspecified
  // so the default servable_state_monitor_creator will be used.
  ServerCore::Options options;

  // model server config
  if (server_options.model_config_file.empty()) {
    options.model_server_config = BuildSingleModelConfig(
        server_options.model_name, server_options.model_base_path);
  } else {
    TF_RETURN_IF_ERROR(ParseProtoTextFile<ModelServerConfig>(
        server_options.model_config_file, &options.model_server_config));
  }

  if (server_options.platform_config_file.empty()) {
    SessionBundleConfig session_bundle_config;
    // Batching config
    if (server_options.enable_batching) {
      BatchingParameters* batching_parameters =
          session_bundle_config.mutable_batching_parameters();
      if (server_options.batching_parameters_file.empty()) {
        batching_parameters->mutable_thread_pool_name()->set_value(
            "model_server_batch_threads");
      } else {
        TF_RETURN_IF_ERROR(ParseProtoTextFile<BatchingParameters>(
            server_options.batching_parameters_file, batching_parameters));
      }
    } else if (!server_options.batching_parameters_file.empty()) {
      return errors::InvalidArgument(
          "server_options.batching_parameters_file is set without setting "
          "server_options.enable_batching to true.");
    }

    if (!server_options.tensorflow_session_config_file.empty()) {
      TF_RETURN_IF_ERROR(
          ParseProtoTextFile(server_options.tensorflow_session_config_file,
                             session_bundle_config.mutable_session_config()));
    }

    session_bundle_config.mutable_session_config()
        ->mutable_gpu_options()
        ->set_per_process_gpu_memory_fraction(
            server_options.per_process_gpu_memory_fraction);

    if (server_options.tensorflow_intra_op_parallelism > 0 &&
        server_options.tensorflow_inter_op_parallelism > 0 &&
        server_options.tensorflow_session_parallelism > 0){
        return errors::InvalidArgument("Either configure "
          "server_options.tensorflow_session_parallelism "
          "or (server_options.tensorflow_intra_op_parallelism, "
          "server_options.tensorflow_inter_op_parallelism) separately. "
          "You cannot configure all.");
    } else if (server_options.tensorflow_intra_op_parallelism > 0 ||
        server_options.tensorflow_inter_op_parallelism > 0){
            session_bundle_config.mutable_session_config()
            ->set_intra_op_parallelism_threads(
                server_options.tensorflow_intra_op_parallelism);
            session_bundle_config.mutable_session_config()
            ->set_inter_op_parallelism_threads(
                server_options.tensorflow_inter_op_parallelism);
    } else {
        session_bundle_config.mutable_session_config()
        ->set_intra_op_parallelism_threads(
            server_options.tensorflow_session_parallelism);
        session_bundle_config.mutable_session_config()
        ->set_inter_op_parallelism_threads(
            server_options.tensorflow_session_parallelism);
    }

    const std::vector<string> tags =
        tensorflow::str_util::Split(server_options.saved_model_tags, ",");
    for (const string& tag : tags) {
      *session_bundle_config.add_saved_model_tags() = tag;
    }
    session_bundle_config.set_enable_model_warmup(
        server_options.enable_model_warmup);
    if (server_options.num_request_iterations_for_warmup > 0) {
      session_bundle_config.mutable_model_warmup_options()
          ->mutable_num_request_iterations()
          ->set_value(server_options.num_request_iterations_for_warmup);
    }
    session_bundle_config.set_remove_unused_fields_from_bundle_metagraph(
        server_options.remove_unused_fields_from_bundle_metagraph);
    session_bundle_config.set_prefer_tflite_model(
        server_options.prefer_tflite_model);
    session_bundle_config.set_num_tflite_interpreters_per_pool(
        server_options.num_tflite_interpreters_per_pool);
    session_bundle_config.set_num_tflite_pools(server_options.num_tflite_pools);
    options.platform_config_map =
        CreateTensorFlowPlatformConfigMap(session_bundle_config);
  } else {
    TF_RETURN_IF_ERROR(ParseProtoTextFile<PlatformConfigMap>(
        server_options.platform_config_file, &options.platform_config_map));
  }

  options.custom_model_config_loader = &LoadCustomModelConfig;
  options.aspired_version_policy =
      std::unique_ptr<AspiredVersionPolicy>(new AvailabilityPreservingPolicy);
  options.num_load_threads = server_options.num_load_threads;
  options.num_unload_threads = server_options.num_unload_threads;
  options.max_num_load_retries = server_options.max_num_load_retries;
  options.load_retry_interval_micros =
      server_options.load_retry_interval_micros;
  options.file_system_poll_wait_seconds =
      server_options.file_system_poll_wait_seconds;
  options.flush_filesystem_caches = server_options.flush_filesystem_caches;
  options.allow_version_labels_for_unavailable_models =
      server_options.allow_version_labels_for_unavailable_models;
  options.force_allow_any_version_labels_for_unavailable_models =
      server_options.force_allow_any_version_labels_for_unavailable_models;
  options.enable_cors_support = server_options.enable_cors_support;

  TF_RETURN_IF_ERROR(ServerCore::Create(std::move(options), &server_core_));

  // Model config polling thread must be started after the call to
  // ServerCore::Create() to prevent config reload being done concurrently from
  // Create() and the poll thread.
  if (server_options.fs_model_config_poll_wait_seconds > 0 &&
      !server_options.model_config_file.empty()) {
    PeriodicFunction::Options pf_options;
    pf_options.thread_name_prefix = "Server_fs_model_config_poll_thread";

    const string model_config_file = server_options.model_config_file;
    fs_config_polling_thread_.reset(new PeriodicFunction(
        [this, model_config_file] {
          this->PollFilesystemAndReloadConfig(model_config_file);
        },
        server_options.fs_model_config_poll_wait_seconds *
            tensorflow::EnvTime::kSecondsToMicros,
        pf_options));
  }

  // 0.0.0.0" is the way to listen on localhost in gRPC.
  const string server_address =
      "0.0.0.0:" + std::to_string(server_options.grpc_port);
  model_service_ = absl::make_unique<ModelServiceImpl>(server_core_.get());

  PredictionServiceImpl::Options predict_server_options;
  predict_server_options.server_core = server_core_.get();
  predict_server_options.enforce_session_run_timeout =
      server_options.enforce_session_run_timeout;
  if (!server_options.thread_pool_factory_config_file.empty()) {
    ThreadPoolFactoryConfig thread_pool_factory_config;
    TF_RETURN_IF_ERROR(ParseProtoTextFile<ThreadPoolFactoryConfig>(
        server_options.thread_pool_factory_config_file,
        &thread_pool_factory_config));
    TF_RETURN_IF_ERROR(ThreadPoolFactoryRegistry::CreateFromAny(
        thread_pool_factory_config.thread_pool_factory_config(),
        &thread_pool_factory_));
  }
  predict_server_options.thread_pool_factory = thread_pool_factory_.get();
  prediction_service_ =
      absl::make_unique<PredictionServiceImpl>(predict_server_options);


  ::grpc::ServerBuilder builder;
  // If defined, listen to a tcp port for gRPC/HTTP.
  if (server_options.grpc_port != 0) {
    builder.AddListeningPort(
        server_address,
        BuildServerCredentials(server_options.use_alts_credentials,
                               server_options.ssl_config_file));
  }
  // If defined, listen to a UNIX socket for gRPC.
  if (!server_options.grpc_socket_path.empty()) {
    const string grpc_socket_uri = "unix:" + server_options.grpc_socket_path;
    builder.AddListeningPort(
        grpc_socket_uri,
        BuildServerCredentials(server_options.use_alts_credentials,
                               server_options.ssl_config_file));
  }
  builder.RegisterService(model_service_.get());
  builder.RegisterService(prediction_service_.get());
  if (server_options.enable_profiler) {
    profiler_service_ = tensorflow::profiler::CreateProfilerService();
    builder.RegisterService(profiler_service_.get());
    LOG(INFO) << "Profiler service is enabled";
  }
  builder.SetMaxMessageSize(tensorflow::kint32max);
  const std::vector<GrpcChannelArgument> channel_arguments =
      parseGrpcChannelArgs(server_options.grpc_channel_arguments);
  for (const GrpcChannelArgument& channel_argument : channel_arguments) {
    // gRPC accept arguments of two types, int and string. We will attempt to
    // parse each arg as int and pass it on as such if successful. Otherwise we
    // will pass it as a string. gRPC will log arguments that were not accepted.
    tensorflow::int32 value;
    if (tensorflow::strings::safe_strto32(channel_argument.value, &value)) {
      builder.AddChannelArgument(channel_argument.key, value);
    } else {
      builder.AddChannelArgument(channel_argument.key, channel_argument.value);
    }
  }

  ::grpc::ResourceQuota res_quota;
  res_quota.SetMaxThreads(server_options.grpc_max_threads);
  builder.SetResourceQuota(res_quota);

  grpc_server_ = builder.BuildAndStart();
  if (grpc_server_ == nullptr) {
    return errors::InvalidArgument("Failed to BuildAndStart gRPC server");
  }
  if (server_options.grpc_port != 0) {
    LOG(INFO) << "Running gRPC ModelServer at " << server_address << " ...";
  }
  if (!server_options.grpc_socket_path.empty()) {
    LOG(INFO) << "Running gRPC ModelServer at UNIX socket "
              << server_options.grpc_socket_path << " ...";
  }

  if (server_options.http_port != 0) {
    if (server_options.http_port != server_options.grpc_port) {
      const string server_address =
          "localhost:" + std::to_string(server_options.http_port);
      MonitoringConfig monitoring_config;
      if (!server_options.monitoring_config_file.empty()) {
        TF_RETURN_IF_ERROR(ParseProtoTextFile<MonitoringConfig>(
            server_options.monitoring_config_file, &monitoring_config));
      }
      http_server_ = CreateAndStartHttpServer(
          server_options.http_port, server_options.http_num_threads,
          server_options.http_timeout_in_ms, monitoring_config,
          server_core_.get());
      if (http_server_ != nullptr) {
        LOG(INFO) << "Exporting HTTP/REST API at:" << server_address << " ...";
      } else {
        LOG(ERROR) << "Failed to start HTTP Server at " << server_address;
      }
    } else {
      LOG(ERROR) << "server_options.http_port cannot be same as grpc_port. "
                 << "Please use a different port for HTTP/REST API. "
                 << "Skipped exporting HTTP/REST API.";
    }
  }
  return Status::OK();
}