void SimpleConsumerImpl::updateAssignments()

in cpp/source/rocketmq/SimpleConsumerImpl.cpp [170:220]


void SimpleConsumerImpl::updateAssignments(const std::string& topic, const std::vector<rmq::Assignment>& assignments) {
  bool changed = false;
  {
    absl::MutexLock lk(&topic_assignments_mtx_);
    if (!topic_assignments_.contains(topic)) {
      changed = true;
      topic_assignments_.insert({topic, assignments});
      {
        absl::MutexLock assignment_lk(&assignments_mtx_);
        assignments_.insert(assignments_.begin(), assignments.begin(), assignments.end());
      }
    } else if (!assignments.empty()) {
      const auto& prev = topic_assignments_[topic];
      std::vector<rmq::Assignment> to_remove;
      std::vector<rmq::Assignment> to_add;
      for (const auto& item : prev) {
        if (std::find_if(assignments.begin(), assignments.end(), [&](const rmq::Assignment& e) { return item == e; }) ==
            assignments.end()) {
          to_remove.push_back(item);
        }
      }

      for (const auto& entry : assignments) {
        if (std::find_if(prev.begin(), prev.end(), [&](const rmq::Assignment e) { return e == entry; }) == prev.end()) {
          to_add.push_back(entry);
        }
      }

      if (!to_remove.empty() || !to_add.empty()) {
        changed = true;
        absl::MutexLock lk(&assignments_mtx_);
        for (const auto& item : to_remove) {
          std::remove_if(assignments_.begin(), assignments_.end(), [&](const rmq::Assignment& e) { return e == item; });
        }

        for (const auto& item : to_add) {
          assignments_.push_back(item);
        }
        topic_assignments_.insert_or_assign(topic, assignments);
      }
    }
  }

  if (changed) {
    SPDLOG_DEBUG("Assignments for topic={} change to: {}", topic,
                 absl::StrJoin(assignments.begin(), assignments.end(), ",",
                               [](std::string* out, const rmq::Assignment& assignment) {
                                 out->append(assignment.DebugString());
                               }));
  }
}