static boolean resolveLocks()

in modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java [132:210]


  static boolean resolveLocks(Environment env, long startTs, TxStats stats,
      List<Entry<Key, Value>> locksKVs, long startTime) {
    // check if transactor is still alive

    int numResolved = 0;

    Map<ByteSequence, Mutation> mutations = new HashMap<>();

    boolean timedOut = false;

    TransactorCache transactorCache = env.getSharedResources().getTransactorCache();

    List<LockInfo> locks = new ArrayList<>();
    locksKVs.forEach(e -> locks.add(new LockInfo(e)));

    List<LockInfo> locksToRecover;
    if (System.currentTimeMillis() - startTime > env.getConfiguration()
        .getTransactionRollbackTime()) {
      locksToRecover = locks;
      stats.incrementTimedOutLocks(locksToRecover.size());
      timedOut = true;
    } else {
      locksToRecover = new ArrayList<>(locks.size());
      for (LockInfo lockInfo : locks) {
        if (transactorCache.checkTimedout(lockInfo.transactorId, lockInfo.lockTs)) {
          locksToRecover.add(lockInfo);
          stats.incrementTimedOutLocks();
        } else if (!transactorCache.checkExists(lockInfo.transactorId)) {
          locksToRecover.add(lockInfo);
          stats.incrementDeadLocks();
        }
      }
    }

    Map<PrimaryRowColumn, List<LockInfo>> groupedLocks = groupLocksByPrimary(locksToRecover);

    if (timedOut) {
      Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();

      for (Entry<PrimaryRowColumn, List<LockInfo>> entry : es) {
        long lockTs = entry.getKey().startTs;
        Long transactorId = entry.getValue().get(0).transactorId;
        transactorCache.addTimedoutTransactor(transactorId, lockTs, startTime);
      }
    }

    TxInfoCache txiCache = env.getSharedResources().getTxInfoCache();
    Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();
    for (Entry<PrimaryRowColumn, List<LockInfo>> group : es) {
      TxInfo txInfo = txiCache.getTransactionInfo(group.getKey());
      switch (txInfo.getStatus()) {
        case COMMITTED:
          commitColumns(env, group.getKey(), group.getValue(), txInfo.getCommitTs(), mutations);
          numResolved += group.getValue().size();
          break;
        case LOCKED:
          if (rollbackPrimary(env, startTs, group.getKey(), txInfo.getLockValue())) {
            rollback(env, startTs, group.getKey(), group.getValue(), mutations);
            numResolved += group.getValue().size();
          }
          break;
        case ROLLED_BACK:
          // TODO ensure this if ok if there concurrent rollback
          rollback(env, startTs, group.getKey(), group.getValue(), mutations);
          numResolved += group.getValue().size();
          break;
        case UNKNOWN:
        default:
          throw new IllegalStateException(
              "can not abort : " + group.getKey() + " (" + txInfo.getStatus() + ")");
      }
    }

    if (!mutations.isEmpty()) {
      env.getSharedResources().getBatchWriter().writeMutations(new ArrayList<>(mutations.values()));
    }

    return numResolved == locks.size();
  }