void on_wire_received()

in src/cpp/RiderLink/Source/RD/src/rd_framework_cpp/src/main/impl/RdMap.h [141:242]


	void on_wire_received(Buffer buffer) const override
	{
		int32_t header = buffer.read_integral<int32_t>();
		bool msg_versioned = (header >> versionedFlagShift) != 0;
		Op op = static_cast<Op>(header & ((1 << versionedFlagShift) - 1));

		int64_t version = msg_versioned ? buffer.read_integral<int64_t>() : 0;

		WK key = KS::read(this->get_serialization_context(), buffer);

		if (op == Op::ACK)
		{
			std::string errmsg;
			if (!msg_versioned)
			{
				errmsg = "Received " + to_string(Op::ACK) + " while msg hasn't versioned flag set";
			}
			else if (!is_master)
			{
				errmsg = "Received " + to_string(Op::ACK) + " when not a Master";
			}
			else
			{
				if (pendingForAck.count(key) > 0)
				{
					int64_t pendingVersion = pendingForAck.at(key);
					if (pendingVersion < version)
					{
						errmsg = "Pending version " + std::to_string(pendingVersion) + " < " + to_string(Op::ACK) + " version `" +
								 std::to_string(version);
					}
					else
					{
						// side effect
						if (pendingVersion == version)
						{
							pendingForAck.erase(key);	 // else we don't need to remove, silently drop
						}
						// return good result
					}
				}
				else
				{
					errmsg = "No pending for " + to_string(Op::ACK);
				}
			}
			if (errmsg.empty())
			{
				spdlog::get("logReceived")->trace(logmsg(Op::ACK, version, &(wrapper::get<K>(key))));
			}
			else
			{
				spdlog::get("logReceived")->error(logmsg(Op::ACK, version, &(wrapper::get<K>(key))) + " >> " + errmsg);
			}
		}
		else
		{
			Buffer serialized_key;
			KS::write(this->get_serialization_context(), serialized_key, wrapper::get<K>(key));

			bool is_put = (op == Op::ADD || op == Op::UPDATE);
			optional<WV> value;
			if (is_put)
			{
				value = VS::read(this->get_serialization_context(), buffer);
			}

			if (msg_versioned || !is_master || pendingForAck.count(key) == 0)
			{
				spdlog::get("logReceived")->trace("RECV{}", logmsg(op, version, &(wrapper::get<K>(key)), value));
				if (value.has_value())
				{
					map::set(std::move(key), *std::move(value));
				}
				else
				{
					map::remove(wrapper::get<K>(key));
				}
			}
			else
			{
				spdlog::get("logReceived")->trace("{} >> REJECTED", logmsg(op, version, &(wrapper::get<K>(key)), value));
			}

			if (msg_versioned)
			{
				auto writer =
					util::make_shared_function([version, serialized_key = std::move(serialized_key)](Buffer& innerBuffer) mutable {
						innerBuffer.write_integral<int32_t>((1u << versionedFlagShift) | static_cast<int32_t>(Op::ACK));
						innerBuffer.write_integral<int64_t>(version);
						// KS::write(this->get_serialization_context(), innerBuffer, wrapper::get<K>(key));
						innerBuffer.write_byte_array_raw(serialized_key.getArray());
						// logSend.trace(logmsg(Op::ACK, version, serialized_key));
					});
				get_wire()->send(rdid, std::move(writer));
				if (is_master)
				{
					spdlog::get("logReceived")->error("Both ends are masters: {}", to_string(location));
				}
			}
		}
	}