in src/main/java/org/apache/accumulo/testing/randomwalk/bulk/BulkPlusOne.java [58:102]
static void bulkLoadLots(Logger log, State state, RandWalkEnv env, Value value) throws Exception {
final FileSystem fs = (FileSystem) state.get("fs");
final Path dir = new Path(fs.getUri() + "/tmp", "bulk_" + UUID.randomUUID());
log.debug("Bulk loading from {}", dir);
final int parts = env.getRandom().nextInt(10) + 1;
// The set created below should always contain 0. So its very important that zero is first in
// concat below.
TreeSet<Integer> startRows = Stream
.concat(Stream.of(0), Stream.generate(() -> env.getRandom().nextInt(LOTS))).distinct()
.limit(parts).collect(Collectors.toCollection(TreeSet::new));
List<String> printRows = startRows.stream().map(row -> String.format(FMT, row))
.collect(Collectors.toList());
String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
log.debug("preparing bulk files with start rows " + printRows + " last row "
+ String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
List<Integer> rows = new ArrayList<>(startRows);
rows.add(LOTS);
for (int i = 0; i < parts; i++) {
String fileName = dir + "/" + String.format("part_%d.rf", i);
log.debug("Creating {}", fileName);
try (RFileWriter writer = RFile.newWriter().to(fileName).withFileSystem(fs).build()) {
writer.startDefaultLocalityGroup();
int start = rows.get(i);
int end = rows.get(i + 1);
for (int j = start; j < end; j++) {
Text row = new Text(String.format(FMT, j));
for (Column col : COLNAMES) {
writer.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), value);
}
writer.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE);
}
}
}
env.getAccumuloClient().tableOperations().importDirectory(dir.toString())
.to(Setup.getTableName()).tableTime(true).load();
fs.delete(dir, true);
log.debug("Finished bulk import, start rows " + printRows + " last row "
+ String.format(FMT, LOTS - 1) + " marker " + markerColumnQualifier);
}