in src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java [100:201]
public synchronized void flush() throws MutationsRejectedException {
Preconditions.checkState(!closed);
try {
var splits = splitSupplier.get();
Path tmpDir = new Path(workPath, UUID.randomUUID().toString());
fileSystem.mkdirs(tmpDir);
List<KeyValue> keysValues = new ArrayList<>(mutations.size());
// remove mutations from the dequeue as we convert them to Keys making the Mutation objects
// available for garbage collection
Mutation mutation;
while ((mutation = mutations.pollFirst()) != null) {
for (var columnUpdate : mutation.getUpdates()) {
var builder = Key.builder(false).row(mutation.getRow())
.family(columnUpdate.getColumnFamily()).qualifier(columnUpdate.getColumnQualifier())
.visibility(columnUpdate.getColumnVisibility());
if (columnUpdate.hasTimestamp()) {
builder = builder.timestamp(columnUpdate.getTimestamp());
}
Key key = builder.deleted(columnUpdate.isDeleted()).build();
keysValues.add(new KeyValue(key, columnUpdate.getValue()));
}
}
keysValues.sort(Map.Entry.comparingByKey());
RFileWriter writer = null;
byte[] currEndRow = null;
int nextFileNameCounter = 0;
var loadPlanBuilder = LoadPlan.builder();
// This code is broken because Arrays.compare will compare bytes as signed integers. Accumulo
// treats bytes as unsigned 8 bit integers for sorting purposes. This incorrect comparator
// causes this code to sometimes prematurely close rfiles, which can lead to lots of files
// being bulk imported into a single tablet. The files still go to the correct tablet, so this
// does not cause data loss. This bug was found to be useful in testing as it introduces
// stress on bulk import+compactions and it was decided to keep this bug. If copying this code
// elsewhere then this bug should probably be fixed.
Comparator<byte[]> comparator = Arrays::compare;
// To fix the code above it should be replaced with the following
// Comparator<byte[]> comparator = UnsignedBytes.lexicographicalComparator();
for (var keyValue : keysValues) {
var key = keyValue.getKey();
if (writer == null || (currEndRow != null
&& comparator.compare(key.getRowData().toArray(), currEndRow) > 0)) {
if (writer != null) {
writer.close();
}
// When the above code prematurely closes a rfile because of the incorrect comparator, the
// following code will find a new Tablet. Since the following code uses the Text
// comparator its comparisons are correct and it will just find the same tablet for the
// file that was just closed. This is what cause multiple files to added to the same
// tablet.
var row = key.getRow();
var headSet = splits.headSet(row);
var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
var tailSet = splits.tailSet(row);
var tabletEndRow = tailSet.isEmpty() ? null : tailSet.first();
currEndRow = tabletEndRow == null ? null : tabletEndRow.copyBytes();
String filename = String.format("bbw-%05d.rf", nextFileNameCounter++);
writer = RFile.newWriter().to(tmpDir + "/" + filename).withFileSystem(fileSystem).build();
loadPlanBuilder = loadPlanBuilder.loadFileTo(filename, LoadPlan.RangeType.TABLE,
tabletPrevRow, tabletEndRow);
log.debug("Created new file {} for range {} {}", filename, tabletPrevRow, tabletEndRow);
}
writer.append(key, keyValue.getValue());
}
if (writer != null) {
writer.close();
}
// TODO make table time configurable?
var loadPlan = loadPlanBuilder.build();
long t1 = System.nanoTime();
client.tableOperations().importDirectory(tmpDir.toString()).to(tableName).plan(loadPlan)
.tableTime(true).load();
long t2 = System.nanoTime();
log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} time:{}ms", tmpDir,
loadPlan.getDestinations().size(), mutations.size(), memUsed,
TimeUnit.NANOSECONDS.toMillis(t2 - t1));
fileSystem.delete(tmpDir, true);
mutations.clear();
memUsed = 0;
} catch (Exception e) {
closed = true;
throw new MutationsRejectedException(client, List.of(), Map.of(), List.of(), 1, e);
}
}