in inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc [411:553]
bool ConsumerConfig::setGroupConsumeTarget(
string& err_info, bool is_bound_consume, const string& group_name,
const map<string, set<string> >& subscribed_topic_and_filter_map, const string& session_key,
uint32_t source_count, bool is_select_big, const map<string, int64_t>& part_offset_map) {
// check parameter group_name
string tgt_group_name;
bool is_success = Utils::ValidGroupName(err_info, group_name, tgt_group_name);
if (!is_success) {
return false;
}
// check parameter subscribed_topic_and_filter_map
if (subscribed_topic_and_filter_map.empty()) {
err_info = "Illegal parameter: subscribed_topic_and_filter_map is empty!";
return false;
}
map<string, set<string> > tmp_sub_map;
map<string, set<string> >::const_iterator it_map;
for (it_map = subscribed_topic_and_filter_map.begin();
it_map != subscribed_topic_and_filter_map.end(); ++it_map) {
uint32_t count = 0;
string tmp_filteritem;
set<string> tgt_filters;
// check topic_name info
is_success = Utils::ValidString(err_info, it_map->first, false, true, true,
tb_config::kTopicNameMaxLength);
if (!is_success) {
stringstream ss;
ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
ss << it_map->first;
ss << " ";
ss << err_info;
err_info = ss.str();
return false;
}
string topic_name = Utils::Trim(it_map->first);
// check filter info
set<string> subscribed_filters = it_map->second;
for (set<string>::iterator it = subscribed_filters.begin(); it != subscribed_filters.end();
++it) {
is_success = Utils::ValidFilterItem(err_info, *it, tmp_filteritem);
if (!is_success) {
stringstream ss;
ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
ss << topic_name;
ss << "'s filter item ";
ss << err_info;
err_info = ss.str();
return false;
}
tgt_filters.insert(tmp_filteritem);
count++;
}
if (count > tb_config::kFilterItemMaxCount) {
stringstream ss;
ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
ss << it_map->first;
ss << "'s filter item over max item count : ";
ss << tb_config::kFilterItemMaxCount;
err_info = ss.str();
return false;
}
tmp_sub_map[topic_name] = tgt_filters;
}
// check if bound consume
if (!is_bound_consume) {
is_bound_consume_ = false;
group_name_ = tgt_group_name;
sub_topic_and_filter_map_ = tmp_sub_map;
err_info = "Ok";
return true;
}
// check session_key
string tgt_session_key = Utils::Trim(session_key);
if (tgt_session_key.length() == 0
|| tgt_session_key.length() > tb_config::kSessionKeyMaxLength) {
if (tgt_session_key.length() == 0) {
err_info = "Illegal parameter: session_key is empty!";
} else {
stringstream ss;
ss << "Illegal parameter: session_key's length over max length ";
ss << tb_config::kSessionKeyMaxLength;
err_info = ss.str();
}
return false;
}
// check part_offset_map
string part_key;
map<string, int64_t> tmp_parts_map;
map<string, int64_t>::const_iterator it_part;
for (it_part = part_offset_map.begin(); it_part != part_offset_map.end(); ++it_part) {
vector<string> result;
Utils::Split(it_part->first, result, delimiter::kDelimiterColon);
if (result.size() != 3) {
stringstream ss;
ss << "Illegal parameter: part_offset_map's key ";
ss << it_part->first;
ss << " format error, value must be aaaa:bbbb:cccc !";
err_info = ss.str();
return false;
}
if (tmp_sub_map.find(result[1]) == tmp_sub_map.end()) {
stringstream ss;
ss << "Illegal parameter: ";
ss << it_part->first;
ss << " subscribed topic ";
ss << result[1];
ss << " not included in subscribed_topic_and_filter_map's topic list!";
err_info = ss.str();
return false;
}
if (it_part->first.find_first_of(delimiter::kDelimiterComma) != string::npos) {
stringstream ss;
ss << "Illegal parameter: key ";
ss << it_part->first;
ss << " include ";
ss << delimiter::kDelimiterComma;
ss << " char!";
err_info = ss.str();
return false;
}
if (it_part->second < 0) {
stringstream ss;
ss << "Illegal parameter: ";
ss << it_part->first;
ss << "'s offset must over or equal zero, value is ";
ss << it_part->second;
err_info = ss.str();
return false;
}
Utils::Join(result, delimiter::kDelimiterColon, part_key);
tmp_parts_map[part_key] = it_part->second;
}
// set verified data
is_bound_consume_ = true;
group_name_ = tgt_group_name;
sub_topic_and_filter_map_ = tmp_sub_map;
session_key_ = tgt_session_key;
source_count_ = source_count;
is_select_big_ = is_select_big;
part_offset_map_ = tmp_parts_map;
err_info = "Ok";
return true;
}