in modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java [1472:1529]
private CommitData setUpBeginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
RowColumn primary) {
if (updates.isEmpty()) {
// TODO do async
deleteWeakRow();
commitCallback.committed();
return null;
}
for (Map<Column, Bytes> cols : updates.values()) {
stats.incrementEntriesSet(cols.size());
}
Bytes primRow = null;
Column primCol = null;
if (primary != null) {
primRow = primary.getRow();
primCol = primary.getColumn();
if (notification != null && !primary.equals(notification.getRowColumn())) {
throw new IllegalArgumentException("Primary must be notification");
}
} else if (notification != null) {
primRow = notification.getRow();
primCol = notification.getColumn();
} else {
outer: for (Entry<Bytes, Map<Column, Bytes>> entry : updates.entrySet()) {
for (Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
if (!isReadLock(entry2.getValue())) {
primRow = entry.getKey();
primCol = entry2.getKey();
break outer;
}
}
}
if (primRow == null) {
// there are only read locks, so nothing to write
deleteWeakRow();
commitCallback.committed();
return null;
}
}
// get a primary column
cd.prow = primRow;
Map<Column, Bytes> colSet = updates.get(cd.prow);
cd.pcol = primCol;
cd.pval = colSet.remove(primCol);
if (colSet.isEmpty()) {
updates.remove(cd.prow);
}
cd.commitObserver = commitCallback;
return cd;
}