protected boolean performCompaction()

in hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java [301:610]


  protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
    long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
    CompactionRequestImpl request, CompactionProgress progress) throws IOException {
    long bytesWrittenProgressForLog = 0;
    long bytesWrittenProgressForShippedCall = 0;
    // Clear old mob references
    mobRefSet.get().clear();
    boolean isUserRequest = userRequest.get();
    boolean major = request.isAllFiles();
    boolean compactMOBs = major && isUserRequest;
    boolean discardMobMiss = conf.getBoolean(MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY,
      MobConstants.DEFAULT_MOB_DISCARD_MISS);
    if (discardMobMiss) {
      LOG.warn("{}=true. This is unsafe setting recommended only when first upgrading to a version"
        + " with the distributed mob compaction feature on a cluster that has experienced MOB data "
        + "corruption.", MobConstants.MOB_UNSAFE_DISCARD_MISS_KEY);
    }
    long maxMobFileSize = conf.getLong(MobConstants.MOB_COMPACTION_MAX_FILE_SIZE_KEY,
      MobConstants.DEFAULT_MOB_COMPACTION_MAX_FILE_SIZE);
    boolean ioOptimizedMode = this.ioOptimizedMode && !disableIO.get();
    LOG.info(
      "Compact MOB={} optimized configured={} optimized enabled={} maximum MOB file size={}"
        + " major={} store={}",
      compactMOBs, this.ioOptimizedMode, ioOptimizedMode, maxMobFileSize, major, getStoreInfo());
    // Since scanner.next() can return 'false' but still be delivering data,
    // we have to use a do/while loop.
    List<ExtendedCell> cells = new ArrayList<>();
    // Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
    long currentTime = EnvironmentEdgeManager.currentTime();
    long lastMillis = 0;
    if (LOG.isDebugEnabled()) {
      lastMillis = currentTime;
    }
    CloseChecker closeChecker = new CloseChecker(conf, currentTime);
    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
    long now = 0;
    boolean hasMore;
    byte[] fileName = null;
    StoreFileWriter mobFileWriter = null;
    /*
     * mobCells are used only to decide if we need to commit or abort current MOB output file.
     */
    long mobCells = 0;
    long cellsCountCompactedToMob = 0, cellsCountCompactedFromMob = 0;
    long cellsSizeCompactedToMob = 0, cellsSizeCompactedFromMob = 0;
    boolean finished = false;

    ScannerContext scannerContext = ScannerContext.newBuilder().setBatchLimit(compactionKVMax)
      .setSizeLimit(ScannerContext.LimitScope.BETWEEN_CELLS, Long.MAX_VALUE, Long.MAX_VALUE,
        compactScannerSizeLimit)
      .build();
    throughputController.start(compactionName);
    KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null;
    long shippedCallSizeLimit =
      (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize();

    ExtendedCell mobCell = null;
    List<String> committedMobWriterFileNames = new ArrayList<>();
    try {

      mobFileWriter = newMobWriter(fd, major, request.getWriterCreationTracker());
      fileName = Bytes.toBytes(mobFileWriter.getPath().getName());

      do {
        hasMore = scanner.next(cells, scannerContext);
        currentTime = EnvironmentEdgeManager.currentTime();
        if (LOG.isDebugEnabled()) {
          now = currentTime;
        }
        if (closeChecker.isTimeLimit(store, currentTime)) {
          progress.cancel();
          return false;
        }
        for (ExtendedCell c : cells) {
          if (compactMOBs) {
            if (MobUtils.isMobReferenceCell(c)) {
              String fName = MobUtils.getMobFileName(c);
              // Added to support migration
              try {
                mobCell = mobStore.resolve(c, true, false).getCell();
              } catch (DoNotRetryIOException e) {
                if (
                  discardMobMiss && e.getCause() != null
                    && e.getCause() instanceof FileNotFoundException
                ) {
                  LOG.error("Missing MOB cell: file={} not found cell={}", fName, c);
                  continue;
                } else {
                  throw e;
                }
              }

              if (discardMobMiss && mobCell.getValueLength() == 0) {
                LOG.error("Missing MOB cell value: file={} mob cell={} cell={}", fName, mobCell, c);
                continue;
              } else if (mobCell.getValueLength() == 0) {
                String errMsg =
                  String.format("Found 0 length MOB cell in a file=%s mob cell=%s " + " cell=%s",
                    fName, mobCell, c);
                throw new IOException(errMsg);
              }

              if (mobCell.getValueLength() > mobSizeThreshold) {
                // put the mob data back to the MOB store file
                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
                if (!ioOptimizedMode) {
                  mobFileWriter.append(mobCell);
                  mobCells++;
                  writer.append(
                    MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
                } else {
                  // I/O optimized mode
                  // Check if MOB cell origin file size is
                  // greater than threshold
                  Long size = mobLengthMap.get().get(fName);
                  if (size == null) {
                    // FATAL error (we should never get here though), abort compaction
                    // This error means that meta section of store file does not contain
                    // MOB file, which has references in at least one cell from this store file
                    String msg = String.format(
                      "Found an unexpected MOB file during compaction %s, aborting compaction %s",
                      fName, getStoreInfo());
                    throw new IOException(msg);
                  }
                  // Can not be null
                  if (size < maxMobFileSize) {
                    // If MOB cell origin file is below threshold
                    // it is get compacted
                    mobFileWriter.append(mobCell);
                    // Update number of mobCells in a current mob writer
                    mobCells++;
                    writer.append(
                      MobUtils.createMobRefCell(mobCell, fileName, this.mobStore.getRefCellTags()));
                    // Update total size of the output (we do not take into account
                    // file compression yet)
                    long len = mobFileWriter.getPos();
                    if (len > maxMobFileSize) {
                      LOG.debug("Closing output MOB File, length={} file={}, store={}", len,
                        mobFileWriter.getPath().getName(), getStoreInfo());
                      mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major,
                        request, committedMobWriterFileNames);
                      fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
                      mobCells = 0;
                    }
                  } else {
                    // We leave large MOB file as is (is not compacted),
                    // then we update set of MOB file references
                    // and append mob cell directly to the store's writer
                    Optional<TableName> refTable = MobUtils.getTableName(c);
                    if (refTable.isPresent()) {
                      mobRefSet.get().put(refTable.get(), fName);
                      writer.append(c);
                    } else {
                      throw new IOException(String.format("MOB cell did not contain a tablename "
                        + "tag. should not be possible. see ref guide on mob troubleshooting. "
                        + "store=%s cell=%s", getStoreInfo(), c));
                    }
                  }
                }
              } else {
                // If MOB value is less than threshold, append it directly to a store file
                PrivateCellUtil.setSequenceId(mobCell, c.getSequenceId());
                writer.append(mobCell);
                cellsCountCompactedFromMob++;
                cellsSizeCompactedFromMob += mobCell.getValueLength();
              }
            } else {
              // Not a MOB reference cell
              int size = c.getValueLength();
              if (size > mobSizeThreshold) {
                // This MOB cell comes from a regular store file
                // therefore we store it into original mob output
                mobFileWriter.append(c);
                writer
                  .append(MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags()));
                mobCells++;
                cellsCountCompactedToMob++;
                cellsSizeCompactedToMob += c.getValueLength();
                if (ioOptimizedMode) {
                  // Update total size of the output (we do not take into account
                  // file compression yet)
                  long len = mobFileWriter.getPos();
                  if (len > maxMobFileSize) {
                    mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major,
                      request, committedMobWriterFileNames);
                    fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
                    mobCells = 0;
                  }
                }
              } else {
                // Not a MOB cell, write it directly to a store file
                writer.append(c);
              }
            }
          } else if (c.getTypeByte() != KeyValue.Type.Put.getCode()) {
            // Not a major compaction or major with MOB disabled
            // If the kv type is not put, directly write the cell
            // to the store file.
            writer.append(c);
          } else if (MobUtils.isMobReferenceCell(c)) {
            // Not a major MOB compaction, Put MOB reference
            if (MobUtils.hasValidMobRefCellValue(c)) {
              // We do not check mobSizeThreshold during normal compaction,
              // leaving it to a MOB compaction run
              Optional<TableName> refTable = MobUtils.getTableName(c);
              if (refTable.isPresent()) {
                mobRefSet.get().put(refTable.get(), MobUtils.getMobFileName(c));
                writer.append(c);
              } else {
                throw new IOException(String.format("MOB cell did not contain a tablename "
                  + "tag. should not be possible. see ref guide on mob troubleshooting. "
                  + "store=%s cell=%s", getStoreInfo(), c));
              }
            } else {
              String errMsg = String.format("Corrupted MOB reference: %s", c.toString());
              throw new IOException(errMsg);
            }
          } else if (c.getValueLength() <= mobSizeThreshold) {
            // If the value size of a cell is not larger than the threshold, directly write it to
            // the store file.
            writer.append(c);
          } else {
            // If the value size of a cell is larger than the threshold, it's regarded as a mob,
            // write this cell to a mob file, and write the path to the store file.
            mobCells++;
            // append the original keyValue in the mob file.
            mobFileWriter.append(c);
            ExtendedCell reference =
              MobUtils.createMobRefCell(c, fileName, this.mobStore.getRefCellTags());
            // write the cell whose value is the path of a mob file to the store file.
            writer.append(reference);
            cellsCountCompactedToMob++;
            cellsSizeCompactedToMob += c.getValueLength();
            if (ioOptimizedMode) {
              long len = mobFileWriter.getPos();
              if (len > maxMobFileSize) {
                mobFileWriter = switchToNewMobWriter(mobFileWriter, fd, mobCells, major, request,
                  committedMobWriterFileNames);
                fileName = Bytes.toBytes(mobFileWriter.getPath().getName());
                mobCells = 0;
              }
            }
          }

          int len = c.getSerializedSize();
          ++progress.currentCompactedKVs;
          progress.totalCompactedSize += len;
          bytesWrittenProgressForShippedCall += len;
          if (LOG.isDebugEnabled()) {
            bytesWrittenProgressForLog += len;
          }
          throughputController.control(compactionName, len);
          if (closeChecker.isSizeLimit(store, len)) {
            progress.cancel();
            return false;
          }
          if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
            ((ShipperListener) writer).beforeShipped();
            kvs.shipped();
            bytesWrittenProgressForShippedCall = 0;
          }
        }
        // Log the progress of long running compactions every minute if
        // logging at DEBUG level
        if (LOG.isDebugEnabled()) {
          if ((now - lastMillis) >= COMPACTION_PROGRESS_LOG_INTERVAL) {
            String rate = String.format("%.2f",
              (bytesWrittenProgressForLog / 1024.0) / ((now - lastMillis) / 1000.0));
            LOG.debug("Compaction progress: {} {}, rate={} KB/sec, throughputController is {}",
              compactionName, progress, rate, throughputController);
            lastMillis = now;
            bytesWrittenProgressForLog = 0;
          }
        }
        cells.clear();
      } while (hasMore);
      // Commit last MOB writer
      commitOrAbortMobWriter(mobFileWriter, fd.maxSeqId, mobCells, major);
      finished = true;
    } catch (InterruptedException e) {
      progress.cancel();
      throw new InterruptedIOException(
        "Interrupted while control throughput of compacting " + compactionName);
    } catch (IOException t) {
      String msg = "Mob compaction failed for region: " + store.getRegionInfo().getEncodedName();
      throw new IOException(msg, t);
    } finally {
      // Clone last cell in the final because writer will append last cell when committing. If
      // don't clone here and once the scanner get closed, then the memory of last cell will be
      // released. (HBASE-22582)
      ((ShipperListener) writer).beforeShipped();
      throughputController.finish(compactionName);
      if (!finished && mobFileWriter != null) {
        // Remove all MOB references because compaction failed
        clearThreadLocals();
        // Abort writer
        LOG.debug("Aborting writer for {} because of a compaction failure, Store {}",
          mobFileWriter.getPath(), getStoreInfo());
        abortWriter(mobFileWriter);
        deleteCommittedMobFiles(committedMobWriterFileNames);
      }
    }

    mobStore.updateCellsCountCompactedFromMob(cellsCountCompactedFromMob);
    mobStore.updateCellsCountCompactedToMob(cellsCountCompactedToMob);
    mobStore.updateCellsSizeCompactedFromMob(cellsSizeCompactedFromMob);
    mobStore.updateCellsSizeCompactedToMob(cellsSizeCompactedToMob);
    progress.complete();
    return true;
  }