in rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp [49:129]
void MessageBroker::dispatch(RdId id, Buffer message) const
{
RD_ASSERT_MSG(!id.isNull(), "id mustn't be null")
{ // synchronized recursively
std::lock_guard<decltype(lock)> guard(lock);
RdReactiveBase const* s = subscriptions[id];
if (s == nullptr)
{
auto it = broker.find(id);
if (it == broker.end())
{
it = broker.emplace(id, Mq{}).first;
}
broker[id].default_scheduler_messages.emplace(std::move(message));
auto action = [this, it, id]() mutable {
auto& current = it->second;
RdReactiveBase const* subscription = subscriptions[id];
optional<Buffer> message;
{
std::lock_guard<decltype(lock)> guard(lock);
if (!current.default_scheduler_messages.empty())
{
message = make_optional<Buffer>(std::move(current.default_scheduler_messages.front()));
current.default_scheduler_messages.pop();
}
}
if (subscription != nullptr)
{
if (message)
{
invoke(subscription, *std::move(message), subscription->get_wire_scheduler() == default_scheduler);
}
}
else
{
logger->trace("No handler for id: {}", to_string(id));
}
if (current.default_scheduler_messages.empty())
{
auto t = std::move(broker[id]);
broker.erase(id);
for (auto& schedMsg : t.custom_scheduler_messages)
{
RD_ASSERT_MSG(subscription->get_wire_scheduler() != default_scheduler,
"require equals of wire and default schedulers")
invoke(subscription, std::move(schedMsg));
}
}
};
std::function<void()> function = util::make_shared_function(std::move(action));
default_scheduler->queue(std::move(function));
}
else
{
if (s->get_wire_scheduler() == default_scheduler || s->get_wire_scheduler()->out_of_order_execution)
{
invoke(s, std::move(message));
}
else
{
auto it = broker.find(id);
if (it == broker.end())
{
invoke(s, std::move(message));
}
else
{
Mq& mq = it->second;
mq.custom_scheduler_messages.push_back(std::move(message));
}
}
}
}
// }
}