in modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java [1265:1303]
public Collection<Mutation> createMutations(CommitData cd) {
long commitTs = getStats().getCommitTs();
HashMap<Bytes, Mutation> mutations = new HashMap<>();
if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval)) {
Flutation m = new Flutation(env, cd.prow);
Notification.put(env, m, cd.pcol, commitTs);
mutations.put(cd.prow, m);
}
for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
if (observedColumns.contains(colUpdates.getKey())) {
Bytes val = colUpdates.getValue();
if (isWrite(val) && !isDelete(val)) {
Mutation m = mutations.get(rowUpdates.getKey());
if (m == null) {
m = new Flutation(env, rowUpdates.getKey());
mutations.put(rowUpdates.getKey(), m);
}
Notification.put(env, m, colUpdates.getKey(), commitTs);
}
}
}
}
for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
Mutation m = mutations.get(entry.getKey());
if (m == null) {
m = new Flutation(env, entry.getKey());
mutations.put(entry.getKey(), m);
}
for (Column col : entry.getValue()) {
Notification.put(env, m, col, commitTs);
}
}
return mutations.values();
}