in cpp/source/rocketmq/SimpleConsumerImpl.cpp [170:220]
void SimpleConsumerImpl::updateAssignments(const std::string& topic, const std::vector<rmq::Assignment>& assignments) {
bool changed = false;
{
absl::MutexLock lk(&topic_assignments_mtx_);
if (!topic_assignments_.contains(topic)) {
changed = true;
topic_assignments_.insert({topic, assignments});
{
absl::MutexLock assignment_lk(&assignments_mtx_);
assignments_.insert(assignments_.begin(), assignments.begin(), assignments.end());
}
} else if (!assignments.empty()) {
const auto& prev = topic_assignments_[topic];
std::vector<rmq::Assignment> to_remove;
std::vector<rmq::Assignment> to_add;
for (const auto& item : prev) {
if (std::find_if(assignments.begin(), assignments.end(), [&](const rmq::Assignment& e) { return item == e; }) ==
assignments.end()) {
to_remove.push_back(item);
}
}
for (const auto& entry : assignments) {
if (std::find_if(prev.begin(), prev.end(), [&](const rmq::Assignment e) { return e == entry; }) == prev.end()) {
to_add.push_back(entry);
}
}
if (!to_remove.empty() || !to_add.empty()) {
changed = true;
absl::MutexLock lk(&assignments_mtx_);
for (const auto& item : to_remove) {
std::remove_if(assignments_.begin(), assignments_.end(), [&](const rmq::Assignment& e) { return e == item; });
}
for (const auto& item : to_add) {
assignments_.push_back(item);
}
topic_assignments_.insert_or_assign(topic, assignments);
}
}
}
if (changed) {
SPDLOG_DEBUG("Assignments for topic={} change to: {}", topic,
absl::StrJoin(assignments.begin(), assignments.end(), ",",
[](std::string* out, const rmq::Assignment& assignment) {
out->append(assignment.DebugString());
}));
}
}