in cpp/celeborn/network/MessageDispatcher.cpp [24:127]
void MessageDispatcher::read(Context*, std::unique_ptr<Message> toRecvMsg) {
switch (toRecvMsg->type()) {
case Message::RPC_RESPONSE: {
RpcResponse* response = reinterpret_cast<RpcResponse*>(toRecvMsg.get());
bool found = true;
auto holder = requestIdRegistry_.withLock([&](auto& registry) {
auto search = registry.find(response->requestId());
if (search == registry.end()) {
LOG(WARNING)
<< "requestId " << response->requestId()
<< " not found when handling RPC_RESPONSE. Might be outdated already, ignored.";
found = false;
return MsgPromiseHolder{};
}
auto result = std::move(search->second);
registry.erase(response->requestId());
return std::move(result);
});
if (found) {
holder.msgPromise.setValue(std::move(toRecvMsg));
}
return;
}
case Message::RPC_FAILURE: {
RpcFailure* failure = reinterpret_cast<RpcFailure*>(toRecvMsg.get());
bool found = true;
auto holder = requestIdRegistry_.withLock([&](auto& registry) {
auto search = registry.find(failure->requestId());
if (search == registry.end()) {
LOG(WARNING)
<< "requestId " << failure->requestId()
<< " not found when handling RPC_FAILURE. Might be outdated already, ignored.";
found = false;
return MsgPromiseHolder{};
}
auto result = std::move(search->second);
registry.erase(failure->requestId());
return std::move(result);
});
LOG(ERROR) << "Rpc failed, requestId: " << failure->requestId()
<< " errorMsg: " << failure->errorMsg() << std::endl;
if (found) {
holder.msgPromise.setException(
folly::exception_wrapper(std::exception()));
}
return;
}
case Message::CHUNK_FETCH_SUCCESS: {
ChunkFetchSuccess* success =
reinterpret_cast<ChunkFetchSuccess*>(toRecvMsg.get());
auto streamChunkSlice = success->streamChunkSlice();
bool found = true;
auto holder = streamChunkSliceRegistry_.withLock([&](auto& registry) {
auto search = registry.find(streamChunkSlice);
if (search == registry.end()) {
LOG(WARNING)
<< "streamChunkSlice " << streamChunkSlice.toString()
<< " not found when handling CHUNK_FETCH_SUCCESS. Might be outdated already, ignored.";
found = false;
return MsgPromiseHolder{};
}
auto result = std::move(search->second);
registry.erase(streamChunkSlice);
return std::move(result);
});
if (found) {
holder.msgPromise.setValue(std::move(toRecvMsg));
}
return;
}
case Message::CHUNK_FETCH_FAILURE: {
ChunkFetchFailure* failure =
reinterpret_cast<ChunkFetchFailure*>(toRecvMsg.get());
auto streamChunkSlice = failure->streamChunkSlice();
bool found = true;
auto holder = streamChunkSliceRegistry_.withLock([&](auto& registry) {
auto search = registry.find(streamChunkSlice);
if (search == registry.end()) {
LOG(WARNING)
<< "streamChunkSlice " << streamChunkSlice.toString()
<< " not found when handling CHUNK_FETCH_FAILURE. Might be outdated already, ignored.";
found = false;
return MsgPromiseHolder{};
}
auto result = std::move(search->second);
registry.erase(streamChunkSlice);
return std::move(result);
});
std::string errorMsg = fmt::format(
"fetchChunk failed, streamChunkSlice: {}, errorMsg: {}",
streamChunkSlice.toString(),
failure->errorMsg());
LOG(ERROR) << errorMsg;
if (found) {
holder.msgPromise.setException(
folly::exception_wrapper(std::exception()));
}
return;
}
default: {
LOG(ERROR) << "unsupported msg for dispatcher";
}
}
}