in src/main/java/org/apache/accumulo/testing/randomwalk/shard/DeleteWord.java [48:99]
public void visit(State state, RandWalkEnv env, Properties props) throws Exception {
String indexTableName = state.getString("indexTableName");
String docTableName = state.getString("docTableName");
int numPartitions = state.getInteger("numPartitions");
Random rand = state.getRandom();
String wordToDelete = Insert.generateRandomWord(rand);
// use index to find all documents containing word
Scanner scanner = env.getAccumuloClient().createScanner(indexTableName, Authorizations.EMPTY);
scanner.fetchColumnFamily(new Text(wordToDelete));
List<Range> documentsToDelete = scanner.stream().onClose(scanner::close).map(Entry::getKey)
.map(Key::getColumnQualifier).map(Range::new).collect(Collectors.toList());
if (documentsToDelete.isEmpty()) {
log.debug("No documents to delete containing " + wordToDelete);
return;
}
// use a batch scanner to fetch all documents
try (BatchScanner bscanner = env.getAccumuloClient().createBatchScanner(docTableName,
Authorizations.EMPTY, 8)) {
bscanner.setRanges(documentsToDelete);
BatchWriter ibw = env.getMultiTableBatchWriter().getBatchWriter(indexTableName);
BatchWriter dbw = env.getMultiTableBatchWriter().getBatchWriter(docTableName);
int count = 0;
for (Entry<Key,Value> entry : bscanner) {
String docID = entry.getKey().getRow().toString();
String doc = entry.getValue().toString();
Insert.unindexDocument(ibw, doc, docID, numPartitions);
Mutation m = new Mutation(docID);
m.putDelete("doc", "");
dbw.addMutation(m);
count++;
}
env.getMultiTableBatchWriter().flush();
if (count != documentsToDelete.size()) {
throw new Exception("Batch scanner did not return expected number of docs " + count + " "
+ documentsToDelete.size());
}
}
log.debug("Deleted " + documentsToDelete.size() + " documents containing " + wordToDelete);
}