in modules/accumulo/src/main/java/org/apache/fluo/recipes/accumulo/export/function/AccumuloWriter.java [78:105]
public void run() {
ArrayList<AccumuloWriter.Mutations> exports = new ArrayList<>();
while (true) {
try {
exports.clear();
// gather export from all threads that have placed an item on the queue
exports.add(exportQueue.take());
exportQueue.drainTo(exports);
for (AccumuloWriter.Mutations ml : exports) {
bw.addMutations(ml.mutations);
}
bw.flush();
// notify all threads waiting after flushing
for (AccumuloWriter.Mutations ml : exports) {
ml.cdl.countDown();
}
} catch (InterruptedException | MutationsRejectedException e) {
throw new RuntimeException(e);
}
}
}