public void handle()

in common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java [273:355]


  public void handle(ResponseMessage message) throws Exception {
    if (message instanceof ChunkFetchSuccess) {
      ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
      logger.debug("Chunk {} fetch succeeded", resp.streamChunkSlice);
      FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
      if (info == null) {
        logger.warn(
            "Ignoring response for block {} from {} since it is not outstanding",
            resp.streamChunkSlice,
            NettyUtils.getRemoteAddress(channel));
        resp.body().release();
      } else {
        try {
          info.callback.onSuccess(resp.streamChunkSlice.chunkIndex, resp.body());
        } finally {
          resp.body().release();
        }
      }
    } else if (message instanceof ChunkFetchFailure) {
      ChunkFetchFailure resp = (ChunkFetchFailure) message;
      logger.error(
          "chunk {} fetch failed, errorMessage {}", resp.streamChunkSlice, resp.errorString);
      FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
      if (info == null) {
        logger.warn(
            "Ignoring response for block {} from {} ({}) since it is not outstanding",
            resp.streamChunkSlice,
            NettyUtils.getRemoteAddress(channel),
            resp.errorString);
      } else {
        logger.warn("Receive ChunkFetchFailure, errorMsg {}", resp.errorString);
        info.callback.onFailure(
            resp.streamChunkSlice.chunkIndex,
            new ChunkFetchFailureException(
                "Failure while fetching " + resp.streamChunkSlice + ": " + resp.errorString));
      }
    } else if (message instanceof RpcResponse) {
      RpcResponse resp = (RpcResponse) message;
      PushRequestInfo info = outstandingPushes.remove(resp.requestId);
      if (info == null) {
        RpcResponseCallback listener = outstandingRpcs.remove(resp.requestId);
        if (listener == null) {
          logger.warn(
              "Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
              resp.requestId,
              NettyUtils.getRemoteAddress(channel),
              resp.body().size());
          resp.body().release();
        } else {
          try {
            listener.onSuccess(resp.body().nioByteBuffer());
          } finally {
            resp.body().release();
          }
        }
      } else {
        try {
          info.callback.onSuccess(resp.body().nioByteBuffer());
        } finally {
          resp.body().release();
        }
      }
    } else if (message instanceof RpcFailure) {
      RpcFailure resp = (RpcFailure) message;
      PushRequestInfo info = outstandingPushes.remove(resp.requestId);
      if (info == null) {
        RpcResponseCallback listener = outstandingRpcs.remove(resp.requestId);
        if (listener == null) {
          logger.warn(
              "Ignoring response for RPC {} from {} ({}) since it is not outstanding",
              resp.requestId,
              NettyUtils.getRemoteAddress(channel),
              resp.errorString);
        } else {
          listener.onFailure(new IOException(resp.errorString));
        }
      } else {
        info.callback.onFailure(new CelebornIOException(resp.errorString));
      }
    } else {
      throw new IllegalStateException("Unknown response type: " + message.type());
    }
  }