private void replayLogs()

in tephra-core/src/main/java/org/apache/tephra/TransactionManager.java [601:693]


  private void replayLogs(Collection<TransactionLog> logs) {
    for (TransactionLog log : logs) {
      LOG.info("Replaying edits from transaction log " + log.getName());
      int editCnt = 0;
      try {
        TransactionLogReader reader = log.getReader();
        // reader may be null in the case of an empty file
        if (reader == null) {
          continue;
        }
        TransactionEdit edit;
        while ((edit = reader.next()) != null) {
          editCnt++;
          switch (edit.getState()) {
            case INPROGRESS:
              long expiration = edit.getExpiration();
              TransactionType type = edit.getType();
              // Check if transaction needs to be migrated to have expiration and type. Previous version of 
              // long running transactions were represented with expiration time as -1.
              // This can be removed when we stop supporting TransactionEditCodecV2.
              if (expiration < 0) {
                expiration = getTxExpirationFromWritePointer(edit.getWritePointer(), defaultLongTimeout);
                type = TransactionType.LONG;
              } else if (type == null) {
                type = TransactionType.SHORT;
              }
              // We don't persist the client id.
              addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), expiration, type, null);
              break;
            case COMMITTING:
              addCommittingChangeSet(edit.getWritePointer(), null, edit.getChanges());
              break;
            case COMMITTED:
              // TODO: need to reconcile usage of transaction id v/s write pointer TEPHRA-140
              long transactionId = edit.getWritePointer();
              long[] checkpointPointers = edit.getCheckpointPointers();
              long writePointer = checkpointPointers == null || checkpointPointers.length == 0 ?
                transactionId : checkpointPointers[checkpointPointers.length - 1];
              doCommit(transactionId, writePointer, new ChangeSet(null, edit.getChanges()),
                       edit.getCommitPointer(), edit.getCanCommit());
              break;
            case INVALID:
              doInvalidate(edit.getWritePointer());
              break;
            case ABORTED:
              type = edit.getType();
              // Check if transaction edit needs to be migrated to have type. Previous versions of
              // ABORTED edits did not contain type.
              // This can be removed when we stop supporting TransactionEditCodecV2.
              if (type == null) {
                InProgressTx inProgressTx = inProgress.get(edit.getWritePointer());
                if (inProgressTx != null) {
                  InProgressType inProgressType = inProgressTx.getType();
                  if (InProgressType.CHECKPOINT.equals(inProgressType)) {
                    // this should never happen, because checkpoints never go into the log edits;
                    LOG.debug("Ignoring ABORTED edit for a checkpoint transaction {}", edit.getWritePointer());
                    break;
                  }
                  if (inProgressType != null) {
                    type = inProgressType.getTransactionType();
                  }
                } else {
                  // If transaction is not in-progress, then it has either been already aborted or invalidated.
                  // We cannot determine the transaction's state based on current information, to be safe invalidate it.
                  LOG.warn("Invalidating transaction {} as it's type cannot be determined during replay",
                           edit.getWritePointer());
                  doInvalidate(edit.getWritePointer());
                  break;
                }
              }
              doAbort(edit.getWritePointer(), edit.getCheckpointPointers(), type);
              break;
            case TRUNCATE_INVALID_TX:
              if (edit.getTruncateInvalidTxTime() != 0) {
                doTruncateInvalidTxBefore(edit.getTruncateInvalidTxTime());
              } else {
                doTruncateInvalidTx(edit.getTruncateInvalidTx());
              }
              break;
            case CHECKPOINT:
              doCheckpoint(edit.getWritePointer(), edit.getParentWritePointer());
              break;
            default:
              // unknown type!
              throw new IllegalArgumentException("Invalid state for WAL entry: " + edit.getState());
          }
        }
      } catch (IOException | InvalidTruncateTimeException e) {
        throw Throwables.propagate(e);
      }
      LOG.info("Read " + editCnt + " edits from log " + log.getName());
    }
  }