in modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java [703:745]
private boolean checkForAckCollision(ConditionalMutation cm) {
Bytes row = Bytes.of(cm.getRow());
if (isTriggerRow(row)) {
List<ColumnUpdate> updates = cm.getUpdates();
for (ColumnUpdate cu : updates) {
// TODO avoid create col vis object
Column col = new Column(Bytes.of(cu.getColumnFamily()), Bytes.of(cu.getColumnQualifier()),
Bytes.of(cu.getColumnVisibility()));
if (notification.getColumn().equals(col)) {
// check to see if ACK exist after notification
Key startKey = SpanUtil.toKey(notification.getRowColumn());
startKey.setTimestamp(ColumnType.ACK.first());
Key endKey = SpanUtil.toKey(notification.getRowColumn());
endKey.setTimestamp(ColumnType.ACK.encode(notification.getTimestamp() + 1));
Range range = new Range(startKey, endKey);
try (Scanner scanner =
env.getAccumuloClient().createScanner(env.getTable(), env.getAuthorizations())) {
scanner.setRange(range);
// TODO could use iterator that stops after 1st ACK. thought of using versioning iter
// but
// it scans to ACK
if (scanner.iterator().hasNext()) {
env.getSharedResources().getBatchWriter()
.writeMutationAsync(notification.newDelete(env));
return true;
}
} catch (TableNotFoundException e) {
// TODO proper exception handling
throw new RuntimeException(e);
}
}
}
}
return false;
}