bool ConsumerConfig::setGroupConsumeTarget()

in inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc [411:553]


bool ConsumerConfig::setGroupConsumeTarget(
    string& err_info, bool is_bound_consume, const string& group_name,
    const map<string, set<string> >& subscribed_topic_and_filter_map, const string& session_key,
    uint32_t source_count, bool is_select_big, const map<string, int64_t>& part_offset_map) {
  // check parameter group_name
  string tgt_group_name;
  bool is_success = Utils::ValidGroupName(err_info, group_name, tgt_group_name);
  if (!is_success) {
    return false;
  }
  // check parameter subscribed_topic_and_filter_map
  if (subscribed_topic_and_filter_map.empty()) {
    err_info = "Illegal parameter: subscribed_topic_and_filter_map is empty!";
    return false;
  }
  map<string, set<string> > tmp_sub_map;
  map<string, set<string> >::const_iterator it_map;
  for (it_map = subscribed_topic_and_filter_map.begin();
       it_map != subscribed_topic_and_filter_map.end(); ++it_map) {
    uint32_t count = 0;
    string tmp_filteritem;
    set<string> tgt_filters;
    // check topic_name info
    is_success = Utils::ValidString(err_info, it_map->first, false, true, true,
                                    tb_config::kTopicNameMaxLength);
    if (!is_success) {
      stringstream ss;
      ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
      ss << it_map->first;
      ss << " ";
      ss << err_info;
      err_info = ss.str();
      return false;
    }
    string topic_name = Utils::Trim(it_map->first);
    // check filter info
    set<string> subscribed_filters = it_map->second;
    for (set<string>::iterator it = subscribed_filters.begin(); it != subscribed_filters.end();
         ++it) {
      is_success = Utils::ValidFilterItem(err_info, *it, tmp_filteritem);
      if (!is_success) {
        stringstream ss;
        ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
        ss << topic_name;
        ss << "'s filter item ";
        ss << err_info;
        err_info = ss.str();
        return false;
      }
      tgt_filters.insert(tmp_filteritem);
      count++;
    }
    if (count > tb_config::kFilterItemMaxCount) {
      stringstream ss;
      ss << "Check parameter subscribed_topic_and_filter_map error: topic ";
      ss << it_map->first;
      ss << "'s filter item over max item count : ";
      ss << tb_config::kFilterItemMaxCount;
      err_info = ss.str();
      return false;
    }
    tmp_sub_map[topic_name] = tgt_filters;
  }
  // check if bound consume
  if (!is_bound_consume) {
    is_bound_consume_ = false;
    group_name_ = tgt_group_name;
    sub_topic_and_filter_map_ = tmp_sub_map;
    err_info = "Ok";
    return true;
  }
  // check session_key
  string tgt_session_key = Utils::Trim(session_key);
  if (tgt_session_key.length() == 0
    || tgt_session_key.length() > tb_config::kSessionKeyMaxLength) {
    if (tgt_session_key.length() == 0) {
      err_info = "Illegal parameter: session_key is empty!";
    } else {
      stringstream ss;
      ss << "Illegal parameter: session_key's length over max length ";
      ss << tb_config::kSessionKeyMaxLength;
      err_info = ss.str();
    }
    return false;
  }
  // check part_offset_map
  string part_key;
  map<string, int64_t> tmp_parts_map;
  map<string, int64_t>::const_iterator it_part;
  for (it_part = part_offset_map.begin(); it_part != part_offset_map.end(); ++it_part) {
    vector<string> result;
    Utils::Split(it_part->first, result, delimiter::kDelimiterColon);
    if (result.size() != 3) {
      stringstream ss;
      ss << "Illegal parameter: part_offset_map's key ";
      ss << it_part->first;
      ss << " format error, value must be aaaa:bbbb:cccc !";
      err_info = ss.str();
      return false;
    }
    if (tmp_sub_map.find(result[1]) == tmp_sub_map.end()) {
      stringstream ss;
      ss << "Illegal parameter: ";
      ss << it_part->first;
      ss << " subscribed topic ";
      ss << result[1];
      ss << " not included in subscribed_topic_and_filter_map's topic list!";
      err_info = ss.str();
      return false;
    }
    if (it_part->first.find_first_of(delimiter::kDelimiterComma) != string::npos) {
      stringstream ss;
      ss << "Illegal parameter: key ";
      ss << it_part->first;
      ss << " include ";
      ss << delimiter::kDelimiterComma;
      ss << " char!";
      err_info = ss.str();
      return false;
    }
    if (it_part->second < 0) {
      stringstream ss;
      ss << "Illegal parameter: ";
      ss << it_part->first;
      ss << "'s offset must over or equal zero, value is ";
      ss << it_part->second;
      err_info = ss.str();
      return false;
    }
    Utils::Join(result, delimiter::kDelimiterColon, part_key);
    tmp_parts_map[part_key] = it_part->second;
  }
  // set verified data
  is_bound_consume_ = true;
  group_name_ = tgt_group_name;
  sub_topic_and_filter_map_ = tmp_sub_map;
  session_key_ = tgt_session_key;
  source_count_ = source_count;
  is_select_big_ = is_select_big;
  part_offset_map_ = tmp_parts_map;
  err_info = "Ok";
  return true;
}