void ClientImpl::start()

in cpp/source/rocketmq/ClientImpl.cpp [99:198]


void ClientImpl::start() {
  State expected = CREATED;
  if (!state_.compare_exchange_strong(expected, State::STARTING)) {
    SPDLOG_ERROR("Attempt to start ClientImpl failed. Expecting: {} Actual: {}", State::CREATED,
                 state_.load(std::memory_order_relaxed));
    return;
  }

  if (!name_server_resolver_) {
    SPDLOG_ERROR("No name server resolver is configured.");
    abort();
  }
  name_server_resolver_->start();

  client_config_.client_id = clientId();

  if (!client_manager_) {
    client_manager_ = std::make_shared<ClientManagerImpl>(client_config_.resource_namespace, client_config_.withSsl);
  }
  client_manager_->start();

  const auto& endpoint = name_server_resolver_->resolve();
  if (endpoint.empty()) {
    SPDLOG_ERROR("Failed to resolve name server address");
    abort();
  }

  createSession(endpoint, false);
  {
    absl::MutexLock lk(&session_map_mtx_);
    session_map_[endpoint]->await();
  }

  std::weak_ptr<ClientImpl> ptr(self());

  {
    // Query routes for topics of interest in synchronous
    std::vector<std::string> topics;
    topicsOfInterest(topics);

    auto mtx = std::make_shared<absl::Mutex>();
    auto cv = std::make_shared<absl::CondVar>();
    bool completed = false;
    for (const auto& topic : topics) {
      completed = false;
      auto callback = [&, mtx, cv](const std::error_code& ec, const TopicRouteDataPtr ptr) {
        if (ec) {
          SPDLOG_ERROR("Failed to query route for {} during starting. Cause: {}", topic, ec.message());
        }

        {
          absl::MutexLock lk(mtx.get());
          completed = true;
        }
        cv->Signal();
      };
      getRouteFor(topic, callback);
      {
        absl::MutexLock lk(mtx.get());
        if (!completed) {
          cv->Wait(mtx.get());
        }
      }
    }
  }

  auto route_update_functor = [ptr]() {
    std::shared_ptr<ClientImpl> base = ptr.lock();
    if (base) {
      base->updateRouteInfo();
    }
  };

  route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
                                                                   std::chrono::seconds(10), std::chrono::seconds(30));

  auto telemetry_functor = [ptr]() {
    std::shared_ptr<ClientImpl> base = ptr.lock();
    if (base) {
      SPDLOG_INFO("Sync client settings to servers");
      base->syncClientSettings();
    }
  };
  telemetry_handle_ = client_manager_->getScheduler()->schedule(telemetry_functor, TELEMETRY_TASK_NAME,
                                                                std::chrono::minutes(5), std::chrono::minutes(5));

  auto&& metric_service_endpoint = metricServiceEndpoint();
  if (!metric_service_endpoint.empty()) {
    std::weak_ptr<Client> client_weak_ptr(self());
#ifdef DEBUG_METRIC_EXPORTING
    opencensus::stats::StatsExporter::SetInterval(absl::Seconds(30));
    opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
#else
    opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
#endif
    SPDLOG_INFO("Export client metrics to {}", metric_service_endpoint);
    opencensus::stats::StatsExporter::RegisterPushHandler(
        absl::make_unique<OpencensusHandler>(metric_service_endpoint, client_weak_ptr));
  }
}