in tephra-core/src/main/java/org/apache/tephra/TransactionManager.java [366:411]
private void cleanupTimedOutTransactions() {
List<TransactionEdit> invalidEdits = null;
logReadLock.lock();
try {
synchronized (this) {
if (!isRunning()) {
return;
}
long currentTime = System.currentTimeMillis();
Map<Long, InProgressType> timedOut = Maps.newHashMap();
for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) {
InProgressTx inProgressTx = tx.getValue();
long expiration = inProgressTx.getExpiration();
if (expiration >= 0L && currentTime > expiration) {
// timed out, remember tx id (can't remove while iterating over entries)
timedOut.put(tx.getKey(), inProgressTx.getType());
LOG.info("Tx invalid list: added tx {} belonging to client '{}' because of timeout.",
tx.getKey(), inProgressTx.getClientId());
} else if (expiration < 0) {
LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the transaction was not " +
"migrated correctly, this transaction will be expired immediately", tx.getKey(), expiration);
timedOut.put(tx.getKey(), InProgressType.LONG);
}
}
if (!timedOut.isEmpty()) {
invalidEdits = Lists.newArrayListWithCapacity(timedOut.size());
invalidTxList.addAll(timedOut.keySet());
for (Map.Entry<Long, InProgressType> tx : timedOut.entrySet()) {
inProgress.remove(tx.getKey());
// checkpoints never go into the committing change sets or the edits
if (!InProgressType.CHECKPOINT.equals(tx.getValue())) {
committingChangeSets.remove(tx.getKey());
invalidEdits.add(TransactionEdit.createInvalid(tx.getKey()));
}
}
LOG.info("Invalidated {} transactions due to timeout.", timedOut.size());
}
}
if (invalidEdits != null) {
appendToLog(invalidEdits);
}
} finally {
this.logReadLock.unlock();
}
}