private void cleanupTimedOutTransactions()

in tephra-core/src/main/java/org/apache/tephra/TransactionManager.java [366:411]


  private void cleanupTimedOutTransactions() {
    List<TransactionEdit> invalidEdits = null;
    logReadLock.lock();
    try {
      synchronized (this) {
        if (!isRunning()) {
          return;
        }
        long currentTime = System.currentTimeMillis();
        Map<Long, InProgressType> timedOut = Maps.newHashMap();
        for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) {
          InProgressTx inProgressTx = tx.getValue();
          long expiration = inProgressTx.getExpiration();
          if (expiration >= 0L && currentTime > expiration) {
            // timed out, remember tx id (can't remove while iterating over entries)
            timedOut.put(tx.getKey(), inProgressTx.getType());
            LOG.info("Tx invalid list: added tx {} belonging to client '{}' because of timeout.",
                     tx.getKey(), inProgressTx.getClientId());
          } else if (expiration < 0) {
            LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the transaction was not " +
                       "migrated correctly, this transaction will be expired immediately", tx.getKey(), expiration);
            timedOut.put(tx.getKey(), InProgressType.LONG);
          }
        }
        if (!timedOut.isEmpty()) {
          invalidEdits = Lists.newArrayListWithCapacity(timedOut.size());
          invalidTxList.addAll(timedOut.keySet());
          for (Map.Entry<Long, InProgressType> tx : timedOut.entrySet()) {
            inProgress.remove(tx.getKey());
            // checkpoints never go into the committing change sets or the edits
            if (!InProgressType.CHECKPOINT.equals(tx.getValue())) {
              committingChangeSets.remove(tx.getKey());
              invalidEdits.add(TransactionEdit.createInvalid(tx.getKey()));
            }
          }

          LOG.info("Invalidated {} transactions due to timeout.", timedOut.size());
        }
      }
      if (invalidEdits != null) {
          appendToLog(invalidEdits);
      }
    } finally {
      this.logReadLock.unlock();
    }
  }