public void operationComplete()

in server/src/main/java/org/apache/uniffle/server/netty/ShuffleServerNettyHandler.java [940:1025]


    public void operationComplete(ChannelFuture future) {
      shuffleServer.getShuffleBufferManager().releaseReadMemory(readBufferSize);
      long readTime = System.currentTimeMillis() - readStartedTime;
      ShuffleServerMetrics.counterTotalReadTime.inc(readTime);
      shuffleServer.getNettyMetrics().recordProcessTime(request.getClass().getName(), readTime);
      if (request instanceof GetLocalShuffleDataRequest) {
        ShuffleServerMetrics.gaugeReadLocalDataFileThreadNum.dec();
        ShuffleServerMetrics.gaugeReadLocalDataFileBufferSize.dec(readBufferSize);
      } else if (request instanceof GetLocalShuffleIndexRequest) {
        ShuffleServerMetrics.gaugeReadLocalIndexFileThreadNum.dec();
        ShuffleServerMetrics.gaugeReadLocalIndexFileBufferSize.dec(readBufferSize);
      } else if (request instanceof GetMemoryShuffleDataRequest) {
        GetMemoryShuffleDataResponse getMemoryShuffleDataResponse =
            (GetMemoryShuffleDataResponse) response;
        if (CollectionUtils.isNotEmpty(getMemoryShuffleDataResponse.getBufferSegments())) {
          ShuffleServerMetrics.gaugeReadMemoryDataThreadNum.dec();
          ShuffleServerMetrics.gaugeReadMemoryDataBufferSize.dec(readBufferSize);
        }
      }
      if (!future.isSuccess()) {
        Throwable cause = future.cause();
        String errorMsg =
            "Error happened when executing "
                + request.getOperationType()
                + " for "
                + requestInfo
                + ", "
                + cause.getMessage();
        if (future.channel().isWritable()) {
          RpcResponse errorResponse;
          if (request instanceof GetLocalShuffleDataRequest) {
            errorResponse =
                new GetLocalShuffleDataResponse(
                    request.getRequestId(),
                    StatusCode.INTERNAL_ERROR,
                    errorMsg,
                    new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
          } else if (request instanceof GetLocalShuffleIndexRequest) {
            errorResponse =
                new GetLocalShuffleIndexResponse(
                    request.getRequestId(),
                    StatusCode.INTERNAL_ERROR,
                    errorMsg,
                    Unpooled.EMPTY_BUFFER,
                    0L);
          } else if (request instanceof GetMemoryShuffleDataRequest) {
            errorResponse =
                new GetMemoryShuffleDataResponse(
                    request.getRequestId(),
                    StatusCode.INTERNAL_ERROR,
                    errorMsg,
                    Lists.newArrayList(),
                    Unpooled.EMPTY_BUFFER);
          } else if (request instanceof GetSortedShuffleDataRequest) {
            errorResponse =
                new GetSortedShuffleDataResponse(
                    request.getRequestId(),
                    StatusCode.INTERNAL_ERROR,
                    errorMsg,
                    -1L,
                    MergeState.INTERNAL_ERROR.code(),
                    Unpooled.EMPTY_BUFFER);
          } else {
            LOG.error("Cannot handle request {}", request.type(), cause);
            return;
          }
          client.getChannel().writeAndFlush(errorResponse);
        }
        LOG.error(
            "Failed to execute {} for {}. Took {} ms and could not retrieve {} bytes of data",
            request.getOperationType(),
            requestInfo,
            readTime,
            dataSize,
            cause);
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug(
              "Successfully executed {} for {}. Took {} ms and retrieved {} bytes of data",
              request.getOperationType(),
              requestInfo,
              readTime,
              dataSize);
        }
      }
    }