public MultiResponse multi()

in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java [2667:2903]


  public MultiResponse multi(final RpcController rpcc, final MultiRequest request)
    throws ServiceException {
    try {
      checkOpen();
    } catch (IOException ie) {
      throw new ServiceException(ie);
    }

    checkBatchSizeAndLogLargeSize(request);

    // rpc controller is how we bring in data via the back door; it is unprotobuf'ed data.
    // It is also the conduit via which we pass back data.
    HBaseRpcController controller = (HBaseRpcController) rpcc;
    CellScanner cellScanner = controller != null ? getAndReset(controller) : null;

    long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;

    MultiResponse.Builder responseBuilder = MultiResponse.newBuilder();
    RegionActionResult.Builder regionActionResultBuilder = RegionActionResult.newBuilder();
    this.rpcMultiRequestCount.increment();
    this.requestCount.increment();
    ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();

    // We no longer use MultiRequest#condition. Instead, we use RegionAction#condition. The
    // following logic is for backward compatibility as old clients still use
    // MultiRequest#condition in case of checkAndMutate with RowMutations.
    if (request.hasCondition()) {
      if (request.getRegionActionList().isEmpty()) {
        // If the region action list is empty, do nothing.
        responseBuilder.setProcessed(true);
        return responseBuilder.build();
      }

      RegionAction regionAction = request.getRegionAction(0);

      // When request.hasCondition() is true, regionAction.getAtomic() should be always true. So
      // we can assume regionAction.getAtomic() is true here.
      assert regionAction.getAtomic();

      OperationQuota quota;
      HRegion region;
      RegionSpecifier regionSpecifier = regionAction.getRegion();

      try {
        region = getRegion(regionSpecifier);
        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
          regionAction.hasCondition());
      } catch (IOException e) {
        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
        return responseBuilder.build();
      }

      try {
        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);
        // We only allow replication in standby state and it will not set the atomic flag.
        if (rejectIfFromClient) {
          failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
            new DoNotRetryIOException(
              region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
          return responseBuilder.build();
        }

        try {
          CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
            cellScanner, request.getCondition(), nonceGroup, spaceQuotaEnforcement);
          responseBuilder.setProcessed(result.isSuccess());
          ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
            ClientProtos.ResultOrException.newBuilder();
          for (int i = 0; i < regionAction.getActionCount(); i++) {
            // To unify the response format with doNonAtomicRegionMutation and read through
            // client's AsyncProcess we have to add an empty result instance per operation
            resultOrExceptionOrBuilder.clear();
            resultOrExceptionOrBuilder.setIndex(i);
            regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
          }
        } catch (IOException e) {
          rpcServer.getMetrics().exception(e);
          // As it's an atomic operation with a condition, we may expect it's a global failure.
          regionActionResultBuilder.setException(ResponseConverter.buildException(e));
        }
      } finally {
        quota.close();
      }

      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
      if (regionLoadStats != null) {
        responseBuilder.setRegionStatistics(MultiRegionLoadStats.newBuilder()
          .addRegion(regionSpecifier).addStat(regionLoadStats).build());
      }
      return responseBuilder.build();
    }

    // this will contain all the cells that we need to return. It's created later, if needed.
    List<ExtendedCellScannable> cellsToReturn = null;
    RegionScannersCloseCallBack closeCallBack = null;
    RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
    Map<RegionSpecifier, ClientProtos.RegionLoadStats> regionStats =
      new HashMap<>(request.getRegionActionCount());

    for (RegionAction regionAction : request.getRegionActionList()) {
      OperationQuota quota;
      HRegion region;
      RegionSpecifier regionSpecifier = regionAction.getRegion();
      regionActionResultBuilder.clear();

      try {
        region = getRegion(regionSpecifier);
        quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(),
          regionAction.hasCondition());
      } catch (IOException e) {
        failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e);
        continue; // For this region it's a failure.
      }

      try {
        boolean rejectIfFromClient = shouldRejectRequestsFromClient(region);

        if (regionAction.hasCondition()) {
          // We only allow replication in standby state and it will not set the atomic flag.
          if (rejectIfFromClient) {
            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
              new DoNotRetryIOException(
                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
            continue;
          }

          try {
            ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
              ClientProtos.ResultOrException.newBuilder();
            if (regionAction.getActionCount() == 1) {
              CheckAndMutateResult result =
                checkAndMutate(region, quota, regionAction.getAction(0).getMutation(), cellScanner,
                  regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement, context);
              regionActionResultBuilder.setProcessed(result.isSuccess());
              resultOrExceptionOrBuilder.setIndex(0);
              if (result.getResult() != null) {
                resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));
              }

              if (result.getMetrics() != null) {
                resultOrExceptionOrBuilder
                  .setMetrics(ProtobufUtil.toQueryMetrics(result.getMetrics()));
              }

              regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
            } else {
              CheckAndMutateResult result = checkAndMutate(region, regionAction.getActionList(),
                cellScanner, regionAction.getCondition(), nonceGroup, spaceQuotaEnforcement);
              regionActionResultBuilder.setProcessed(result.isSuccess());
              for (int i = 0; i < regionAction.getActionCount(); i++) {
                if (i == 0 && result.getResult() != null) {
                  // Set the result of the Increment/Append operations to the first element of the
                  // ResultOrException list
                  resultOrExceptionOrBuilder.setIndex(i);
                  regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
                    .setResult(ProtobufUtil.toResult(result.getResult())).build());
                  continue;
                }
                // To unify the response format with doNonAtomicRegionMutation and read through
                // client's AsyncProcess we have to add an empty result instance per operation
                resultOrExceptionOrBuilder.clear();
                resultOrExceptionOrBuilder.setIndex(i);
                regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder.build());
              }
            }
          } catch (IOException e) {
            rpcServer.getMetrics().exception(e);
            // As it's an atomic operation with a condition, we may expect it's a global failure.
            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
          }
        } else if (regionAction.hasAtomic() && regionAction.getAtomic()) {
          // We only allow replication in standby state and it will not set the atomic flag.
          if (rejectIfFromClient) {
            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
              new DoNotRetryIOException(
                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
            continue;
          }
          try {
            doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
              cellScanner, nonceGroup, spaceQuotaEnforcement);
            regionActionResultBuilder.setProcessed(true);
            // We no longer use MultiResponse#processed. Instead, we use
            // RegionActionResult#processed. This is for backward compatibility for old clients.
            responseBuilder.setProcessed(true);
          } catch (IOException e) {
            rpcServer.getMetrics().exception(e);
            // As it's atomic, we may expect it's a global failure.
            regionActionResultBuilder.setException(ResponseConverter.buildException(e));
          }
        } else {
          if (
            rejectIfFromClient && regionAction.getActionCount() > 0
              && !isReplicationRequest(regionAction.getAction(0))
          ) {
            // fail if it is not a replication request
            failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner,
              new DoNotRetryIOException(
                region.getRegionInfo().getRegionNameAsString() + " is in STANDBY state"));
            continue;
          }
          // doNonAtomicRegionMutation manages the exception internally
          if (context != null && closeCallBack == null) {
            // An RpcCallBack that creates a list of scanners that needs to perform callBack
            // operation on completion of multiGets.
            // Set this only once
            closeCallBack = new RegionScannersCloseCallBack();
            context.setCallBack(closeCallBack);
          }
          cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
            regionActionResultBuilder, cellsToReturn, nonceGroup, closeCallBack, context,
            spaceQuotaEnforcement);
        }
      } finally {
        quota.close();
      }

      responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
      ClientProtos.RegionLoadStats regionLoadStats = region.getLoadStatistics();
      if (regionLoadStats != null) {
        regionStats.put(regionSpecifier, regionLoadStats);
      }
    }
    // Load the controller with the Cells to return.
    if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
      controller.setCellScanner(PrivateCellUtil.createExtendedCellScanner(cellsToReturn));
    }

    MultiRegionLoadStats.Builder builder = MultiRegionLoadStats.newBuilder();
    for (Entry<RegionSpecifier, ClientProtos.RegionLoadStats> stat : regionStats.entrySet()) {
      builder.addRegion(stat.getKey());
      builder.addStat(stat.getValue());
    }
    responseBuilder.setRegionStatistics(builder);
    return responseBuilder.build();
  }