in src/shadow/Shadow.cpp [347:446]
ResponseCode Shadow::AddShadowSubscription(util::Map<ShadowRequestType, RequestHandlerPtr> &request_mapping) {
if (nullptr == p_mqtt_client_) {
return ResponseCode::SHADOW_MQTT_CLIENT_NOT_SET_ERROR;
}
ResponseCode rc = ResponseCode::SHADOW_REQUEST_MAP_EMPTY;
if (!request_mapping.empty()) {
if (p_mqtt_client_->IsConnected()) {
bool has_get = false;
bool has_update = false;
bool has_delete = false;
bool has_delta = false;
util::Vector<std::shared_ptr<mqtt::Subscription>> topic_vector;
mqtt::Subscription::ApplicationCallbackHandlerPtr p_sub_handler =
std::bind(&Shadow::SubscriptionHandler, this, std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3);
util::Map<ShadowRequestType, RequestHandlerPtr>::const_iterator request_itr = request_mapping.begin();
while (request_itr != request_mapping.end()) {
util::String topic_name_str = "";
switch (request_itr->first) {
case ShadowRequestType::Get :
has_get = true;
topic_name_str.append(shadow_topic_get_);
break;
case ShadowRequestType::Update :
has_update = true;
topic_name_str.append(shadow_topic_update_);
break;
case ShadowRequestType::Delete :
has_delete = true;
topic_name_str.append(shadow_topic_delete_);
break;
case ShadowRequestType::Delta :
has_delta = true;
topic_name_str.append(shadow_topic_delta_);
break;
}
if (has_delta) {
std::shared_ptr<mqtt::Subscription> p_subscription_delta =
mqtt::Subscription::Create(Utf8String::Create(shadow_topic_delta_), mqtt::QoS::QOS0,
p_sub_handler, nullptr);
topic_vector.push_back(p_subscription_delta);
} else {
topic_name_str.append("/");
util::String topic_name_accepted = topic_name_str;
topic_name_accepted.append(SHADOW_RESPONSE_TYPE_ACCEPTED_STRING);
std::shared_ptr<mqtt::Subscription> p_subscription_accepted =
mqtt::Subscription::Create(Utf8String::Create(topic_name_accepted), mqtt::QoS::QOS0,
p_sub_handler, nullptr);
util::String topic_name_rejected = topic_name_str;
topic_name_rejected.append(SHADOW_RESPONSE_TYPE_REJECTED_STRING);
std::shared_ptr<mqtt::Subscription> p_subscription_rejected =
mqtt::Subscription::Create(Utf8String::Create(topic_name_rejected), mqtt::QoS::QOS0,
p_sub_handler, nullptr);
topic_vector.push_back(p_subscription_accepted);
topic_vector.push_back(p_subscription_rejected);
}
request_itr++;
}
rc = p_mqtt_client_->Subscribe(topic_vector, mqtt_command_timeout_);
if (ResponseCode::SUCCESS == rc) {
if (has_get) {
is_get_subscription_active_ = true;
}
if (has_update) {
is_update_subscription_active_ = true;
}
if (has_delete) {
is_delete_subscription_active_ = true;
}
if (has_delta) {
is_delta_subscription_active_ = true;
}
request_itr = request_mapping.begin();
while (request_itr != request_mapping.end()) {
util::Map<ShadowRequestType, RequestHandlerPtr>::const_iterator request_itr_temp
= request_mapping_.find(request_itr->first);
if (request_mapping_.end() != request_itr_temp) {
request_mapping_.erase(request_itr_temp);
}
request_mapping_.insert(std::make_pair(request_itr->first, request_itr->second));
request_itr++;
}
std::this_thread::sleep_for(std::chrono::seconds(SUBSCRIPTION_SETTING_TIME_SECS));
}
} else {
rc = ResponseCode::SHADOW_MQTT_DISCONNECTED_ERROR;
}
}
return rc;
}