in cpp/source/rocketmq/ClientImpl.cpp [94:197]
void ClientImpl::start() {
State expected = CREATED;
if (!state_.compare_exchange_strong(expected, State::STARTING)) {
SPDLOG_ERROR("Attempt to start ClientImpl failed. Expecting: {} Actual: {}", State::CREATED,
state_.load(std::memory_order_relaxed));
return;
}
if (!name_server_resolver_) {
SPDLOG_ERROR("No name server resolver is configured.");
abort();
}
name_server_resolver_->start();
client_config_.client_id = clientId();
if (!client_manager_) {
client_manager_ = std::make_shared<ClientManagerImpl>(client_config_.resource_namespace, client_config_.withSsl);
}
client_manager_->start();
const auto& endpoint = name_server_resolver_->resolve();
if (endpoint.empty()) {
SPDLOG_ERROR("Failed to resolve name server address");
abort();
}
createSession(endpoint, false);
{
absl::MutexLock lk(&session_map_mtx_);
session_map_[endpoint]->await();
}
std::weak_ptr<ClientImpl> ptr(self());
{
// Query routes for topics of interest in synchronous
std::vector<std::string> topics;
topicsOfInterest(topics);
auto mtx = std::make_shared<absl::Mutex>();
auto cv = std::make_shared<absl::CondVar>();
bool completed = false;
for (const auto& topic : topics) {
completed = false;
auto callback = [&, mtx, cv](const std::error_code& ec, const TopicRouteDataPtr ptr) {
if (ec) {
SPDLOG_ERROR("Failed to query route for {} during starting. Cause: {}", topic, ec.message());
}
{
absl::MutexLock lk(mtx.get());
completed = true;
}
cv->Signal();
};
getRouteFor(topic, callback);
{
absl::MutexLock lk(mtx.get());
if (!completed) {
cv->Wait(mtx.get());
}
}
}
}
auto route_update_functor = [ptr]() {
std::shared_ptr<ClientImpl> base = ptr.lock();
if (base) {
base->updateRouteInfo();
}
};
route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
std::chrono::seconds(10), std::chrono::seconds(30));
auto telemetry_functor = [ptr]() {
std::shared_ptr<ClientImpl> base = ptr.lock();
if (base) {
SPDLOG_DEBUG("Sync client settings to servers");
base->syncClientSettings();
}
};
// refer java sdk: set refresh interval to 5 minutes
// org.apache.rocketmq.client.java.impl.ClientSessionImpl#syncSettings0
telemetry_handle_ = client_manager_->getScheduler()->schedule(
telemetry_functor, TELEMETRY_TASK_NAME,
std::chrono::minutes(5), std::chrono::minutes(5));
auto&& metric_service_endpoint = metricServiceEndpoint();
if (!metric_service_endpoint.empty()) {
std::weak_ptr<Client> client_weak_ptr(self());
#ifdef DEBUG_METRIC_EXPORTING
opencensus::stats::StatsExporter::SetInterval(absl::Seconds(30));
opencensus::stats::StatsExporter::RegisterPushHandler(absl::make_unique<StdoutHandler>());
#else
opencensus::stats::StatsExporter::SetInterval(absl::Minutes(1));
#endif
SPDLOG_INFO("Export client metrics to {}", metric_service_endpoint);
opencensus::stats::StatsExporter::RegisterPushHandler(
absl::make_unique<OpencensusHandler>(metric_service_endpoint, client_weak_ptr));
}
}