in common/src/main/java/org/apache/celeborn/common/network/client/TransportResponseHandler.java [273:355]
public void handle(ResponseMessage message) throws Exception {
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
logger.debug("Chunk {} fetch succeeded", resp.streamChunkSlice);
FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
if (info == null) {
logger.warn(
"Ignoring response for block {} from {} since it is not outstanding",
resp.streamChunkSlice,
NettyUtils.getRemoteAddress(channel));
resp.body().release();
} else {
try {
info.callback.onSuccess(resp.streamChunkSlice.chunkIndex, resp.body());
} finally {
resp.body().release();
}
}
} else if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure resp = (ChunkFetchFailure) message;
logger.error(
"chunk {} fetch failed, errorMessage {}", resp.streamChunkSlice, resp.errorString);
FetchRequestInfo info = outstandingFetches.remove(resp.streamChunkSlice);
if (info == null) {
logger.warn(
"Ignoring response for block {} from {} ({}) since it is not outstanding",
resp.streamChunkSlice,
NettyUtils.getRemoteAddress(channel),
resp.errorString);
} else {
logger.warn("Receive ChunkFetchFailure, errorMsg {}", resp.errorString);
info.callback.onFailure(
resp.streamChunkSlice.chunkIndex,
new ChunkFetchFailureException(
"Failure while fetching " + resp.streamChunkSlice + ": " + resp.errorString));
}
} else if (message instanceof RpcResponse) {
RpcResponse resp = (RpcResponse) message;
PushRequestInfo info = outstandingPushes.remove(resp.requestId);
if (info == null) {
RpcResponseCallback listener = outstandingRpcs.remove(resp.requestId);
if (listener == null) {
logger.warn(
"Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
resp.requestId,
NettyUtils.getRemoteAddress(channel),
resp.body().size());
resp.body().release();
} else {
try {
listener.onSuccess(resp.body().nioByteBuffer());
} finally {
resp.body().release();
}
}
} else {
try {
info.callback.onSuccess(resp.body().nioByteBuffer());
} finally {
resp.body().release();
}
}
} else if (message instanceof RpcFailure) {
RpcFailure resp = (RpcFailure) message;
PushRequestInfo info = outstandingPushes.remove(resp.requestId);
if (info == null) {
RpcResponseCallback listener = outstandingRpcs.remove(resp.requestId);
if (listener == null) {
logger.warn(
"Ignoring response for RPC {} from {} ({}) since it is not outstanding",
resp.requestId,
NettyUtils.getRemoteAddress(channel),
resp.errorString);
} else {
listener.onFailure(new IOException(resp.errorString));
}
} else {
info.callback.onFailure(new CelebornIOException(resp.errorString));
}
} else {
throw new IllegalStateException("Unknown response type: " + message.type());
}
}