private void checkForOrphanedReadLocks()

in modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java [628:688]


  private void checkForOrphanedReadLocks(CommitData cd, Map<Bytes, Set<Column>> locksResolved)
      throws Exception {

    if (readLocksSeen.isEmpty()) {
      return;
    }

    Map<Bytes, Set<Column>> rowColsToCheck = new HashMap<>();

    for (Entry<Bytes, Set<Column>> entry : cd.getRejected().entrySet()) {

      Set<Column> resolvedColumns =
          locksResolved.getOrDefault(entry.getKey(), Collections.emptySet());

      Set<Column> colsToCheck = null;
      Set<Column> readLockCols = readLocksSeen.get(entry.getKey());
      if (readLockCols != null) {
        for (Column candidate : Sets.intersection(readLockCols, entry.getValue())) {
          if (resolvedColumns.contains(candidate)) {
            // A write lock was seen and this is probably what caused the collision, no need to
            // check this column for read locks.
            continue;
          }

          if (!isReadLock(updates.getOrDefault(entry.getKey(), Collections.emptyMap())
              .getOrDefault(candidate, EMPTY_BS))) {
            if (colsToCheck == null) {
              colsToCheck = new HashSet<>();
            }
            colsToCheck.add(candidate);
          }
        }

        if (colsToCheck != null) {
          rowColsToCheck.put(entry.getKey(), colsToCheck);
        }
      }
    }

    if (!rowColsToCheck.isEmpty()) {

      long waitTime = SnapshotScanner.INITIAL_WAIT_TIME;

      boolean resolved = false;

      List<Entry<Key, Value>> openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);

      long startTime = System.currentTimeMillis();

      while (!resolved) {
        resolved = LockResolver.resolveLocks(env, startTs, stats, openReadLocks, startTime);
        if (!resolved) {
          UtilWaitThread.sleep(waitTime);
          stats.incrementLockWaitTime(waitTime);
          waitTime = Math.min(SnapshotScanner.MAX_WAIT_TIME, waitTime * 2);

          openReadLocks = LockResolver.getOpenReadLocks(env, rowColsToCheck);
        }
      }
    }
  }