in modules/core/src/main/java/org/apache/fluo/core/impl/LockResolver.java [132:210]
static boolean resolveLocks(Environment env, long startTs, TxStats stats,
List<Entry<Key, Value>> locksKVs, long startTime) {
// check if transactor is still alive
int numResolved = 0;
Map<ByteSequence, Mutation> mutations = new HashMap<>();
boolean timedOut = false;
TransactorCache transactorCache = env.getSharedResources().getTransactorCache();
List<LockInfo> locks = new ArrayList<>();
locksKVs.forEach(e -> locks.add(new LockInfo(e)));
List<LockInfo> locksToRecover;
if (System.currentTimeMillis() - startTime > env.getConfiguration()
.getTransactionRollbackTime()) {
locksToRecover = locks;
stats.incrementTimedOutLocks(locksToRecover.size());
timedOut = true;
} else {
locksToRecover = new ArrayList<>(locks.size());
for (LockInfo lockInfo : locks) {
if (transactorCache.checkTimedout(lockInfo.transactorId, lockInfo.lockTs)) {
locksToRecover.add(lockInfo);
stats.incrementTimedOutLocks();
} else if (!transactorCache.checkExists(lockInfo.transactorId)) {
locksToRecover.add(lockInfo);
stats.incrementDeadLocks();
}
}
}
Map<PrimaryRowColumn, List<LockInfo>> groupedLocks = groupLocksByPrimary(locksToRecover);
if (timedOut) {
Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();
for (Entry<PrimaryRowColumn, List<LockInfo>> entry : es) {
long lockTs = entry.getKey().startTs;
Long transactorId = entry.getValue().get(0).transactorId;
transactorCache.addTimedoutTransactor(transactorId, lockTs, startTime);
}
}
TxInfoCache txiCache = env.getSharedResources().getTxInfoCache();
Set<Entry<PrimaryRowColumn, List<LockInfo>>> es = groupedLocks.entrySet();
for (Entry<PrimaryRowColumn, List<LockInfo>> group : es) {
TxInfo txInfo = txiCache.getTransactionInfo(group.getKey());
switch (txInfo.getStatus()) {
case COMMITTED:
commitColumns(env, group.getKey(), group.getValue(), txInfo.getCommitTs(), mutations);
numResolved += group.getValue().size();
break;
case LOCKED:
if (rollbackPrimary(env, startTs, group.getKey(), txInfo.getLockValue())) {
rollback(env, startTs, group.getKey(), group.getValue(), mutations);
numResolved += group.getValue().size();
}
break;
case ROLLED_BACK:
// TODO ensure this if ok if there concurrent rollback
rollback(env, startTs, group.getKey(), group.getValue(), mutations);
numResolved += group.getValue().size();
break;
case UNKNOWN:
default:
throw new IllegalStateException(
"can not abort : " + group.getKey() + " (" + txInfo.getStatus() + ")");
}
}
if (!mutations.isEmpty()) {
env.getSharedResources().getBatchWriter().writeMutations(new ArrayList<>(mutations.values()));
}
return numResolved == locks.size();
}