in modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java [238:291]
private Map<Column, Bytes> getImpl(Bytes row, Set<Column> columns) {
// TODO push visibility filtering to server side?
env.getSharedResources().getVisCache().validate(columns);
boolean shouldCopy = false;
for (Column column : columns) {
if (column.isVisibilitySet()) {
shouldCopy = true;
}
}
SnapshotScanner.Opts opts;
if (shouldCopy) {
HashSet<Column> cols = new HashSet<>();
for (Column column : columns) {
if (column.isVisibilitySet()) {
cols.add(new Column(column.getFamily(), column.getQualifier()));
} else {
cols.add(column);
}
}
opts = new SnapshotScanner.Opts(Span.exact(row), columns, true, this.scanTimeAuthz);
} else {
opts = new SnapshotScanner.Opts(Span.exact(row), columns, true, this.scanTimeAuthz);
}
Map<Column, Bytes> ret = new HashMap<>();
Set<Column> readLockCols = null;
for (Entry<Key, Value> kve : new SnapshotScanner(env, opts, startTs, stats)) {
Column col = ColumnUtil.convert(kve.getKey());
if (shouldCopy && !columns.contains(col)) {
continue;
}
if (ColumnType.from(kve.getKey()) == ColumnType.RLOCK) {
if (readLockCols == null) {
readLockCols = readLocksSeen.computeIfAbsent(row, k -> new HashSet<>());
}
readLockCols.add(col);
} else {
ret.put(col, Bytes.of(kve.getValue().get()));
}
}
// only update columns read after successful read
updateColumnsRead(row, columns);
return ret;
}