public synchronized void flush()

in src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java [100:201]


  public synchronized void flush() throws MutationsRejectedException {
    Preconditions.checkState(!closed);

    try {
      var splits = splitSupplier.get();

      Path tmpDir = new Path(workPath, UUID.randomUUID().toString());
      fileSystem.mkdirs(tmpDir);

      List<KeyValue> keysValues = new ArrayList<>(mutations.size());

      // remove mutations from the dequeue as we convert them to Keys making the Mutation objects
      // available for garbage collection
      Mutation mutation;
      while ((mutation = mutations.pollFirst()) != null) {
        for (var columnUpdate : mutation.getUpdates()) {
          var builder = Key.builder(false).row(mutation.getRow())
              .family(columnUpdate.getColumnFamily()).qualifier(columnUpdate.getColumnQualifier())
              .visibility(columnUpdate.getColumnVisibility());
          if (columnUpdate.hasTimestamp()) {
            builder = builder.timestamp(columnUpdate.getTimestamp());
          }
          Key key = builder.deleted(columnUpdate.isDeleted()).build();
          keysValues.add(new KeyValue(key, columnUpdate.getValue()));
        }
      }

      keysValues.sort(Map.Entry.comparingByKey());

      RFileWriter writer = null;
      byte[] currEndRow = null;
      int nextFileNameCounter = 0;

      var loadPlanBuilder = LoadPlan.builder();

      // This code is broken because Arrays.compare will compare bytes as signed integers. Accumulo
      // treats bytes as unsigned 8 bit integers for sorting purposes. This incorrect comparator
      // causes this code to sometimes prematurely close rfiles, which can lead to lots of files
      // being bulk imported into a single tablet. The files still go to the correct tablet, so this
      // does not cause data loss. This bug was found to be useful in testing as it introduces
      // stress on bulk import+compactions and it was decided to keep this bug. If copying this code
      // elsewhere then this bug should probably be fixed.
      Comparator<byte[]> comparator = Arrays::compare;
      // To fix the code above it should be replaced with the following
      // Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator();

      for (var keyValue : keysValues) {
        var key = keyValue.getKey();
        if (writer == null || (currEndRow != null
            && comparator.compare(key.getRowData().toArray(), currEndRow) > 0)) {
          if (writer != null) {
            writer.close();
          }

          // When the above code prematurely closes a rfile because of the incorrect comparator, the
          // following code will find a new Tablet. Since the following code uses the Text
          // comparator its comparisons are correct and it will just find the same tablet for the
          // file that was just closed. This is what cause multiple files to added to the same
          // tablet.
          var row = key.getRow();
          var headSet = splits.headSet(row);
          var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
          var tailSet = splits.tailSet(row);
          var tabletEndRow = tailSet.isEmpty() ? null : tailSet.first();
          currEndRow = tabletEndRow == null ? null : tabletEndRow.copyBytes();

          String filename = String.format("bbw-%05d.rf", nextFileNameCounter++);
          writer = RFile.newWriter().to(tmpDir + "/" + filename).withFileSystem(fileSystem).build();
          loadPlanBuilder = loadPlanBuilder.loadFileTo(filename, LoadPlan.RangeType.TABLE,
              tabletPrevRow, tabletEndRow);

          log.debug("Created new file {} for range {} {}", filename, tabletPrevRow, tabletEndRow);
        }

        writer.append(key, keyValue.getValue());
      }

      if (writer != null) {
        writer.close();
      }

      // TODO make table time configurable?
      var loadPlan = loadPlanBuilder.build();

      long t1 = System.nanoTime();
      client.tableOperations().importDirectory(tmpDir.toString()).to(tableName).plan(loadPlan)
          .tableTime(true).load();
      long t2 = System.nanoTime();

      log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} time:{}ms", tmpDir,
          loadPlan.getDestinations().size(), mutations.size(), memUsed,
          TimeUnit.NANOSECONDS.toMillis(t2 - t1));

      fileSystem.delete(tmpDir, true);

      mutations.clear();
      memUsed = 0;
    } catch (Exception e) {
      closed = true;
      throw new MutationsRejectedException(client, List.of(), Map.of(), List.of(), 1, e);
    }
  }