void MessageBroker::dispatch()

in rd-cpp/src/rd_framework_cpp/src/main/protocol/MessageBroker.cpp [49:129]


void MessageBroker::dispatch(RdId id, Buffer message) const
{
	RD_ASSERT_MSG(!id.isNull(), "id mustn't be null")

	{	 // synchronized recursively
		std::lock_guard<decltype(lock)> guard(lock);
		RdReactiveBase const* s = subscriptions[id];
		if (s == nullptr)
		{
			auto it = broker.find(id);
			if (it == broker.end())
			{
				it = broker.emplace(id, Mq{}).first;
			}

			broker[id].default_scheduler_messages.emplace(std::move(message));

			auto action = [this, it, id]() mutable {
				auto& current = it->second;
				RdReactiveBase const* subscription = subscriptions[id];

				optional<Buffer> message;
				{
					std::lock_guard<decltype(lock)> guard(lock);
					if (!current.default_scheduler_messages.empty())
					{
						message = make_optional<Buffer>(std::move(current.default_scheduler_messages.front()));
						current.default_scheduler_messages.pop();
					}
				}
				if (subscription != nullptr)
				{
					if (message)
					{
						invoke(subscription, *std::move(message), subscription->get_wire_scheduler() == default_scheduler);
					}
				}
				else
				{
					logger->trace("No handler for id: {}", to_string(id));
				}

				if (current.default_scheduler_messages.empty())
				{
					auto t = std::move(broker[id]);
					broker.erase(id);
					for (auto& schedMsg : t.custom_scheduler_messages)
					{
						RD_ASSERT_MSG(subscription->get_wire_scheduler() != default_scheduler,
							"require equals of wire and default schedulers")
						invoke(subscription, std::move(schedMsg));
					}
				}
			};
			std::function<void()> function = util::make_shared_function(std::move(action));
			default_scheduler->queue(std::move(function));
		}
		else
		{
			if (s->get_wire_scheduler() == default_scheduler || s->get_wire_scheduler()->out_of_order_execution)
			{
				invoke(s, std::move(message));
			}
			else
			{
				auto it = broker.find(id);
				if (it == broker.end())
				{
					invoke(s, std::move(message));
				}
				else
				{
					Mq& mq = it->second;
					mq.custom_scheduler_messages.push_back(std::move(message));
				}
			}
		}
	}

	//        }
}