std::unique_ptr ServerBuilder::BuildAndStart()

in src/cpp/server/server_builder.cc [220:431]


std::unique_ptr<grpc::Server> ServerBuilder::BuildAndStart() {
  grpc::ChannelArguments args;

  for (const auto& option : options_) {
    option->UpdateArguments(&args);
    option->UpdatePlugins(&plugins_);
  }
  if (max_receive_message_size_ >= -1) {
    grpc_channel_args c_args = args.c_channel_args();
    const grpc_arg* arg =
        grpc_channel_args_find(&c_args, GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH);
    // Some option has set max_receive_message_length and it is also set
    // directly on the ServerBuilder.
    if (arg != nullptr) {
      gpr_log(
          GPR_ERROR,
          "gRPC ServerBuilder receives multiple max_receive_message_length");
    }
    args.SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, max_receive_message_size_);
  }
  // The default message size is -1 (max), so no need to explicitly set it for
  // -1.
  if (max_send_message_size_ >= 0) {
    args.SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, max_send_message_size_);
  }

  args.SetInt(GRPC_COMPRESSION_CHANNEL_ENABLED_ALGORITHMS_BITSET,
              enabled_compression_algorithms_bitset_);
  if (maybe_default_compression_level_.is_set) {
    args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_LEVEL,
                maybe_default_compression_level_.level);
  }
  if (maybe_default_compression_algorithm_.is_set) {
    args.SetInt(GRPC_COMPRESSION_CHANNEL_DEFAULT_ALGORITHM,
                maybe_default_compression_algorithm_.algorithm);
  }

  if (resource_quota_ != nullptr) {
    args.SetPointerWithVtable(GRPC_ARG_RESOURCE_QUOTA, resource_quota_,
                              grpc_resource_quota_arg_vtable());
  }

  for (const auto& plugin : plugins_) {
    plugin->UpdateServerBuilder(this);
    plugin->UpdateChannelArguments(&args);
  }

  // == Determine if the server has any syncrhonous methods ==
  bool has_sync_methods = false;
  for (const auto& value : services_) {
    if (value->service->has_synchronous_methods()) {
      has_sync_methods = true;
      break;
    }
  }

  if (!has_sync_methods) {
    for (const auto& value : plugins_) {
      if (value->has_sync_methods()) {
        has_sync_methods = true;
        break;
      }
    }
  }

  // If this is a Sync server, i.e a server expositing sync API, then the server
  // needs to create some completion queues to listen for incoming requests.
  // 'sync_server_cqs' are those internal completion queues.
  //
  // This is different from the completion queues added to the server via
  // ServerBuilder's AddCompletionQueue() method (those completion queues
  // are in 'cqs_' member variable of ServerBuilder object)
  std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
      sync_server_cqs(std::make_shared<
                      std::vector<std::unique_ptr<ServerCompletionQueue>>>());

  bool has_frequently_polled_cqs = false;
  for (const auto& cq : cqs_) {
    if (cq->IsFrequentlyPolled()) {
      has_frequently_polled_cqs = true;
      break;
    }
  }

  // == Determine if the server has any callback methods ==
  bool has_callback_methods = false;
  for (const auto& service : services_) {
    if (service->service->has_callback_methods()) {
      has_callback_methods = true;
      has_frequently_polled_cqs = true;
      break;
    }
  }

  const bool is_hybrid_server = has_sync_methods && has_frequently_polled_cqs;

  if (has_sync_methods) {
    grpc_cq_polling_type polling_type =
        is_hybrid_server ? GRPC_CQ_NON_POLLING : GRPC_CQ_DEFAULT_POLLING;

    // Create completion queues to listen to incoming rpc requests
    for (int i = 0; i < sync_server_settings_.num_cqs; i++) {
      sync_server_cqs->emplace_back(
          new ServerCompletionQueue(GRPC_CQ_NEXT, polling_type, nullptr));
    }
  }

  // TODO(vjpai): Add a section here for plugins once they can support callback
  // methods

  if (has_sync_methods) {
    // This is a Sync server
    gpr_log(GPR_INFO,
            "Synchronous server. Num CQs: %d, Min pollers: %d, Max Pollers: "
            "%d, CQ timeout (msec): %d",
            sync_server_settings_.num_cqs, sync_server_settings_.min_pollers,
            sync_server_settings_.max_pollers,
            sync_server_settings_.cq_timeout_msec);
  }

  if (has_callback_methods) {
    gpr_log(GPR_INFO, "Callback server.");
  }

  std::unique_ptr<grpc::Server> server(new grpc::Server(
      &args, sync_server_cqs, sync_server_settings_.min_pollers,
      sync_server_settings_.max_pollers, sync_server_settings_.cq_timeout_msec,
      std::move(acceptors_), resource_quota_,
      std::move(interceptor_creators_)));

  grpc_impl::ServerInitializer* initializer = server->initializer();

  // Register all the completion queues with the server. i.e
  //  1. sync_server_cqs: internal completion queues created IF this is a sync
  //     server
  //  2. cqs_: Completion queues added via AddCompletionQueue() call

  for (const auto& cq : *sync_server_cqs) {
    grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
    has_frequently_polled_cqs = true;
  }

  if (has_callback_methods || callback_generic_service_ != nullptr) {
    auto* cq = server->CallbackCQ();
    grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
  }

  // cqs_ contains the completion queue added by calling the ServerBuilder's
  // AddCompletionQueue() API. Some of them may not be frequently polled (i.e by
  // calling Next() or AsyncNext()) and hence are not safe to be used for
  // listening to incoming channels. Such completion queues must be registered
  // as non-listening queues. In debug mode, these should have their server list
  // tracked since these are provided the user and must be Shutdown by the user
  // after the server is shutdown.
  for (const auto& cq : cqs_) {
    grpc_server_register_completion_queue(server->server_, cq->cq(), nullptr);
    cq->RegisterServer(server.get());
  }

  if (!has_frequently_polled_cqs) {
    gpr_log(GPR_ERROR,
            "At least one of the completion queues must be frequently polled");
    return nullptr;
  }

  for (const auto& value : services_) {
    if (!server->RegisterService(value->host.get(), value->service)) {
      return nullptr;
    }
  }

  for (const auto& value : plugins_) {
    value->InitServer(initializer);
  }

  if (generic_service_) {
    server->RegisterAsyncGenericService(generic_service_);
  } else if (callback_generic_service_) {
    server->RegisterCallbackGenericService(callback_generic_service_);
  } else {
    for (const auto& value : services_) {
      if (value->service->has_generic_methods()) {
        gpr_log(GPR_ERROR,
                "Some methods were marked generic but there is no "
                "generic service registered.");
        return nullptr;
      }
    }
  }

  bool added_port = false;
  for (auto& port : ports_) {
    int r = server->AddListeningPort(port.addr, port.creds.get());
    if (!r) {
      if (added_port) server->Shutdown();
      return nullptr;
    }
    added_port = true;
    if (port.selected_port != nullptr) {
      *port.selected_port = r;
    }
  }

  auto cqs_data = cqs_.empty() ? nullptr : &cqs_[0];
  server->Start(cqs_data, cqs_.size());

  for (const auto& value : plugins_) {
    value->Finish(initializer);
  }

  return server;
}