in modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java [628:688]
private void checkForOrphanedReadLocks(CommitData cd, Map<Bytes, Set<Column>> locksResolved)
throws Exception {
if (readLocksSeen.isEmpty()) {
return;
}
Map<Bytes, Set<Column>> rowColsToCheck = new HashMap<>();
for (Entry<Bytes, Set<Column>> entry : cd.getRejected().entrySet()) {
Set<Column> resolvedColumns =
locksResolved.getOrDefault(entry.getKey(), Collections.emptySet());
Set<Column> colsToCheck = null;
Set<Column> readLockCols = readLocksSeen.get(entry.getKey());
if (readLockCols != null) {
for (Column candidate : Sets.intersection(readLockCols, entry.getValue())) {
if (resolvedColumns.contains(candidate)) {
// A write lock was seen and this is probably what caused the collision, no need to
// check this column for read locks.
continue;
}
if (!isReadLock(updates.getOrDefault(entry.getKey(), Collections.emptyMap())
.getOrDefault(candidate, EMPTY_BS))) {
if (colsToCheck == null) {
colsToCheck = new HashSet<>();
}
colsToCheck.add(candidate);
}
}
if (colsToCheck != null) {
rowColsToCheck.put(entry.getKey(), colsToCheck);
}
}
}
if (!rowColsToCheck.isEmpty()) {
long waitTime = SnapshotScanner.INITIAL_WAIT_TIME;
boolean resolved = false;
List<Entry<Key, Value>> openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
long startTime = System.currentTimeMillis();
while (!resolved) {
resolved = LockResolver.resolveLocks(env, startTs, stats, openReadLocks, startTime);
if (!resolved) {
UtilWaitThread.sleep(waitTime);
stats.incrementLockWaitTime(waitTime);
waitTime = Math.min(SnapshotScanner.MAX_WAIT_TIME, waitTime * 2);
openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
}
}
}
}