void MessageDispatcher::read()

in cpp/celeborn/network/MessageDispatcher.cpp [24:127]


void MessageDispatcher::read(Context*, std::unique_ptr<Message> toRecvMsg) {
  switch (toRecvMsg->type()) {
    case Message::RPC_RESPONSE: {
      RpcResponse* response = reinterpret_cast<RpcResponse*>(toRecvMsg.get());
      bool found = true;
      auto holder = requestIdRegistry_.withLock([&](auto& registry) {
        auto search = registry.find(response->requestId());
        if (search == registry.end()) {
          LOG(WARNING)
              << "requestId " << response->requestId()
              << " not found when handling RPC_RESPONSE. Might be outdated already, ignored.";
          found = false;
          return MsgPromiseHolder{};
        }
        auto result = std::move(search->second);
        registry.erase(response->requestId());
        return std::move(result);
      });
      if (found) {
        holder.msgPromise.setValue(std::move(toRecvMsg));
      }
      return;
    }
    case Message::RPC_FAILURE: {
      RpcFailure* failure = reinterpret_cast<RpcFailure*>(toRecvMsg.get());
      bool found = true;
      auto holder = requestIdRegistry_.withLock([&](auto& registry) {
        auto search = registry.find(failure->requestId());
        if (search == registry.end()) {
          LOG(WARNING)
              << "requestId " << failure->requestId()
              << " not found when handling RPC_FAILURE. Might be outdated already, ignored.";
          found = false;
          return MsgPromiseHolder{};
        }
        auto result = std::move(search->second);
        registry.erase(failure->requestId());
        return std::move(result);
      });
      LOG(ERROR) << "Rpc failed, requestId: " << failure->requestId()
                 << " errorMsg: " << failure->errorMsg() << std::endl;
      if (found) {
        holder.msgPromise.setException(
            folly::exception_wrapper(std::exception()));
      }
      return;
    }
    case Message::CHUNK_FETCH_SUCCESS: {
      ChunkFetchSuccess* success =
          reinterpret_cast<ChunkFetchSuccess*>(toRecvMsg.get());
      auto streamChunkSlice = success->streamChunkSlice();
      bool found = true;
      auto holder = streamChunkSliceRegistry_.withLock([&](auto& registry) {
        auto search = registry.find(streamChunkSlice);
        if (search == registry.end()) {
          LOG(WARNING)
              << "streamChunkSlice " << streamChunkSlice.toString()
              << " not found when handling CHUNK_FETCH_SUCCESS. Might be outdated already, ignored.";
          found = false;
          return MsgPromiseHolder{};
        }
        auto result = std::move(search->second);
        registry.erase(streamChunkSlice);
        return std::move(result);
      });
      if (found) {
        holder.msgPromise.setValue(std::move(toRecvMsg));
      }
      return;
    }
    case Message::CHUNK_FETCH_FAILURE: {
      ChunkFetchFailure* failure =
          reinterpret_cast<ChunkFetchFailure*>(toRecvMsg.get());
      auto streamChunkSlice = failure->streamChunkSlice();
      bool found = true;
      auto holder = streamChunkSliceRegistry_.withLock([&](auto& registry) {
        auto search = registry.find(streamChunkSlice);
        if (search == registry.end()) {
          LOG(WARNING)
              << "streamChunkSlice " << streamChunkSlice.toString()
              << " not found when handling CHUNK_FETCH_FAILURE. Might be outdated already, ignored.";
          found = false;
          return MsgPromiseHolder{};
        }
        auto result = std::move(search->second);
        registry.erase(streamChunkSlice);
        return std::move(result);
      });
      std::string errorMsg = fmt::format(
          "fetchChunk failed, streamChunkSlice: {}, errorMsg: {}",
          streamChunkSlice.toString(),
          failure->errorMsg());
      LOG(ERROR) << errorMsg;
      if (found) {
        holder.msgPromise.setException(
            folly::exception_wrapper(std::exception()));
      }
      return;
    }
    default: {
      LOG(ERROR) << "unsupported msg for dispatcher";
    }
  }
}