in rd-cpp/src/rd_framework_cpp/src/main/impl/RdMap.h [136:237]
void on_wire_received(Buffer buffer) const override
{
int32_t header = buffer.read_integral<int32_t>();
bool msg_versioned = (header >> versionedFlagShift) != 0;
Op op = static_cast<Op>(header & ((1 << versionedFlagShift) - 1));
int64_t version = msg_versioned ? buffer.read_integral<int64_t>() : 0;
WK key = KS::read(this->get_serialization_context(), buffer);
if (op == Op::ACK)
{
std::string errmsg;
if (!msg_versioned)
{
errmsg = "Received " + to_string(Op::ACK) + " while msg hasn't versioned flag set";
}
else if (!is_master)
{
errmsg = "Received " + to_string(Op::ACK) + " when not a Master";
}
else
{
if (pendingForAck.count(key) > 0)
{
int64_t pendingVersion = pendingForAck.at(key);
if (pendingVersion < version)
{
errmsg = "Pending version " + std::to_string(pendingVersion) + " < " + to_string(Op::ACK) + " version `" +
std::to_string(version);
}
else
{
// side effect
if (pendingVersion == version)
{
pendingForAck.erase(key); // else we don't need to remove, silently drop
}
// return good result
}
}
else
{
errmsg = "No pending for " + to_string(Op::ACK);
}
}
if (errmsg.empty())
{
spdlog::get("logReceived")->trace(logmsg(Op::ACK, version, &(wrapper::get<K>(key))));
}
else
{
spdlog::get("logReceived")->error(logmsg(Op::ACK, version, &(wrapper::get<K>(key))) + " >> " + errmsg);
}
}
else
{
Buffer serialized_key;
KS::write(this->get_serialization_context(), serialized_key, wrapper::get<K>(key));
bool is_put = (op == Op::ADD || op == Op::UPDATE);
optional<WV> value;
if (is_put)
{
value = VS::read(this->get_serialization_context(), buffer);
}
if (msg_versioned || !is_master || pendingForAck.count(key) == 0)
{
spdlog::get("logReceived")->trace("RECV{}", logmsg(op, version, &(wrapper::get<K>(key)), value));
if (value.has_value())
{
map::set(std::move(key), *std::move(value));
}
else
{
map::remove(wrapper::get<K>(key));
}
}
else
{
spdlog::get("logReceived")->trace("{} >> REJECTED", logmsg(op, version, &(wrapper::get<K>(key)), value));
}
if (msg_versioned)
{
auto writer =
util::make_shared_function([version, serialized_key = std::move(serialized_key)](Buffer& innerBuffer) mutable {
innerBuffer.write_integral<int32_t>((1u << versionedFlagShift) | static_cast<int32_t>(Op::ACK));
innerBuffer.write_integral<int64_t>(version);
// KS::write(this->get_serialization_context(), innerBuffer, wrapper::get<K>(key));
innerBuffer.write_byte_array_raw(serialized_key.getArray());
// logSend.trace(logmsg(Op::ACK, version, serialized_key));
});
get_wire()->send(rdid, std::move(writer));
if (is_master)
{
spdlog::get("logReceived")->error("Both ends are masters: {}", to_string(location));
}
}
}
}