in src/main/java/org/apache/accumulo/testing/randomwalk/multitable/BulkImport.java [59:110]
public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
@SuppressWarnings({"unchecked"})
List<String> tables = (List<String>) state.get("tableList");
if (tables.isEmpty()) {
log.trace("No tables to ingest into");
return;
}
String tableName = tables.get(env.getRandom().nextInt(tables.size()));
String uuid = UUID.randomUUID().toString();
final Path dir = new Path("/tmp/bulk", uuid);
final Path fail = new Path(dir + "_fail");
final FileSystem fs = (FileSystem) state.get("fs");
fs.mkdirs(fail);
final int parts = env.getRandom().nextInt(10) + 1;
TreeSet<String> rows = new TreeSet<>();
for (int i = 0; i < ROWS; i++)
rows.add(uuid + String.format("__%06d", i));
String markerColumnQualifier = String.format("%07d", counter.incrementAndGet());
log.debug("Preparing bulk import to {}", tableName);
for (int i = 0; i < parts; i++) {
String fileName = dir + "/" + String.format("part_%d.rf", i);
try (RFileWriter f = RFile.newWriter().to(fileName).withFileSystem(fs).build()) {
f.startDefaultLocalityGroup();
for (String r : rows) {
Text row = new Text(r);
for (Column col : COLNAMES) {
f.append(new Key(row, col.getColumnFamily(), col.getColumnQualifier()), ONE);
}
f.append(new Key(row, MARKER_CF, new Text(markerColumnQualifier)), ONE);
}
}
}
log.debug("Starting bulk import to {}", tableName);
try {
env.getAccumuloClient().tableOperations().importDirectory(dir.toString()).to(tableName)
.tableTime(true).load();
fs.delete(dir, true);
fs.delete(fail, true);
log.debug("Finished bulk import to {} start: {} last: {} marker: {}", tableName, rows.first(),
rows.last(), markerColumnQualifier);
} catch (TableNotFoundException tnfe) {
log.debug("Table {} was deleted", tableName);
tables.remove(tableName);
}
}