private List doNonAtomicRegionMutation()

in hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java [743:907]


  private List<ExtendedCellScannable> doNonAtomicRegionMutation(final HRegion region,
    final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
    final RegionActionResult.Builder builder, List<ExtendedCellScannable> cellsToReturn,
    long nonceGroup, final RegionScannersCloseCallBack closeCallBack, RpcCallContext context,
    ActivePolicyEnforcement spaceQuotaEnforcement) {
    // Gather up CONTIGUOUS Puts and Deletes in this mutations List. Idea is that rather than do
    // one at a time, we instead pass them in batch. Be aware that the corresponding
    // ResultOrException instance that matches each Put or Delete is then added down in the
    // doNonAtomicBatchOp call. We should be staying aligned though the Put and Delete are
    // deferred/batched
    List<ClientProtos.Action> mutations = null;
    long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getMaxResultSize());
    IOException sizeIOE = null;
    ClientProtos.ResultOrException.Builder resultOrExceptionBuilder =
      ResultOrException.newBuilder();
    boolean hasResultOrException = false;
    for (ClientProtos.Action action : actions.getActionList()) {
      hasResultOrException = false;
      resultOrExceptionBuilder.clear();
      try {
        Result r = null;
        long blockBytesScannedBefore = context != null ? context.getBlockBytesScanned() : 0;
        if (
          context != null && context.isRetryImmediatelySupported()
            && (context.getResponseCellSize() > maxQuotaResultSize
              || blockBytesScannedBefore + context.getResponseExceptionSize() > maxQuotaResultSize)
        ) {

          // We're storing the exception since the exception and reason string won't
          // change after the response size limit is reached.
          if (sizeIOE == null) {
            // We don't need the stack un-winding do don't throw the exception.
            // Throwing will kill the JVM's JIT.
            //
            // Instead just create the exception and then store it.
            sizeIOE = new MultiActionResultTooLarge("Max size exceeded" + " CellSize: "
              + context.getResponseCellSize() + " BlockSize: " + blockBytesScannedBefore);

            // Only report the exception once since there's only one request that
            // caused the exception. Otherwise this number will dominate the exceptions count.
            rpcServer.getMetrics().exception(sizeIOE);
          }

          // Now that there's an exception is known to be created
          // use it for the response.
          //
          // This will create a copy in the builder.
          NameBytesPair pair = ResponseConverter.buildException(sizeIOE);
          resultOrExceptionBuilder.setException(pair);
          context.incrementResponseExceptionSize(pair.getSerializedSize());
          resultOrExceptionBuilder.setIndex(action.getIndex());
          builder.addResultOrException(resultOrExceptionBuilder.build());
          skipCellsForMutation(action, cellScanner);
          continue;
        }
        if (action.hasGet()) {
          long before = EnvironmentEdgeManager.currentTime();
          ClientProtos.Get pbGet = action.getGet();
          // An asynchbase client, https://github.com/OpenTSDB/asynchbase, starts by trying to do
          // a get closest before. Throwing the UnknownProtocolException signals it that it needs
          // to switch and do hbase2 protocol (HBase servers do not tell clients what versions
          // they are; its a problem for non-native clients like asynchbase. HBASE-20225.
          if (pbGet.hasClosestRowBefore() && pbGet.getClosestRowBefore()) {
            throw new UnknownProtocolException("Is this a pre-hbase-1.0.0 or asynchbase client? "
              + "Client is invoking getClosestRowBefore removed in hbase-2.0.0 replaced by "
              + "reverse Scan.");
          }
          try {
            Get get = ProtobufUtil.toGet(pbGet);
            if (context != null) {
              r = get(get, (region), closeCallBack, context);
            } else {
              r = region.get(get);
            }
          } finally {
            final MetricsRegionServer metricsRegionServer = server.getMetrics();
            if (metricsRegionServer != null) {
              long blockBytesScanned =
                context != null ? context.getBlockBytesScanned() - blockBytesScannedBefore : 0;
              metricsRegionServer.updateGet(region, EnvironmentEdgeManager.currentTime() - before,
                blockBytesScanned);
            }
          }
        } else if (action.hasServiceCall()) {
          hasResultOrException = true;
          Message result = execServiceOnRegion(region, action.getServiceCall());
          ClientProtos.CoprocessorServiceResult.Builder serviceResultBuilder =
            ClientProtos.CoprocessorServiceResult.newBuilder();
          resultOrExceptionBuilder.setServiceResult(serviceResultBuilder
            .setValue(serviceResultBuilder.getValueBuilder().setName(result.getClass().getName())
              // TODO: Copy!!!
              .setValue(UnsafeByteOperations.unsafeWrap(result.toByteArray()))));
        } else if (action.hasMutation()) {
          MutationType type = action.getMutation().getMutateType();
          if (
            type != MutationType.PUT && type != MutationType.DELETE && mutations != null
              && !mutations.isEmpty()
          ) {
            // Flush out any Puts or Deletes already collected.
            doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner,
              spaceQuotaEnforcement);
            mutations.clear();
          }
          switch (type) {
            case APPEND:
              r = append(region, quota, action.getMutation(), cellScanner, nonceGroup,
                spaceQuotaEnforcement, context);
              break;
            case INCREMENT:
              r = increment(region, quota, action.getMutation(), cellScanner, nonceGroup,
                spaceQuotaEnforcement, context);
              break;
            case PUT:
            case DELETE:
              // Collect the individual mutations and apply in a batch
              if (mutations == null) {
                mutations = new ArrayList<>(actions.getActionCount());
              }
              mutations.add(action);
              break;
            default:
              throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
          }
        } else {
          throw new HBaseIOException("Unexpected Action type");
        }
        if (r != null) {
          ClientProtos.Result pbResult = null;
          if (isClientCellBlockSupport(context)) {
            pbResult = ProtobufUtil.toResultNoData(r);
            // Hard to guess the size here. Just make a rough guess.
            if (cellsToReturn == null) {
              cellsToReturn = new ArrayList<>();
            }
            cellsToReturn.add(r);
          } else {
            pbResult = ProtobufUtil.toResult(r);
          }
          addSize(context, r);
          hasResultOrException = true;
          resultOrExceptionBuilder.setResult(pbResult);
        }
        // Could get to here and there was no result and no exception. Presumes we added
        // a Put or Delete to the collecting Mutations List for adding later. In this
        // case the corresponding ResultOrException instance for the Put or Delete will be added
        // down in the doNonAtomicBatchOp method call rather than up here.
      } catch (IOException ie) {
        rpcServer.getMetrics().exception(ie);
        hasResultOrException = true;
        NameBytesPair pair = ResponseConverter.buildException(ie);
        resultOrExceptionBuilder.setException(pair);
        context.incrementResponseExceptionSize(pair.getSerializedSize());
      }
      if (hasResultOrException) {
        // Propagate index.
        resultOrExceptionBuilder.setIndex(action.getIndex());
        builder.addResultOrException(resultOrExceptionBuilder.build());
      }
    }
    // Finish up any outstanding mutations
    if (!CollectionUtils.isEmpty(mutations)) {
      doNonAtomicBatchOp(builder, region, quota, mutations, cellScanner, spaceQuotaEnforcement);
    }
    return cellsToReturn;
  }