in tephra-core/src/main/java/org/apache/tephra/TransactionManager.java [601:693]
private void replayLogs(Collection<TransactionLog> logs) {
for (TransactionLog log : logs) {
LOG.info("Replaying edits from transaction log " + log.getName());
int editCnt = 0;
try {
TransactionLogReader reader = log.getReader();
// reader may be null in the case of an empty file
if (reader == null) {
continue;
}
TransactionEdit edit;
while ((edit = reader.next()) != null) {
editCnt++;
switch (edit.getState()) {
case INPROGRESS:
long expiration = edit.getExpiration();
TransactionType type = edit.getType();
// Check if transaction needs to be migrated to have expiration and type. Previous version of
// long running transactions were represented with expiration time as -1.
// This can be removed when we stop supporting TransactionEditCodecV2.
if (expiration < 0) {
expiration = getTxExpirationFromWritePointer(edit.getWritePointer(), defaultLongTimeout);
type = TransactionType.LONG;
} else if (type == null) {
type = TransactionType.SHORT;
}
// We don't persist the client id.
addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(), expiration, type, null);
break;
case COMMITTING:
addCommittingChangeSet(edit.getWritePointer(), null, edit.getChanges());
break;
case COMMITTED:
// TODO: need to reconcile usage of transaction id v/s write pointer TEPHRA-140
long transactionId = edit.getWritePointer();
long[] checkpointPointers = edit.getCheckpointPointers();
long writePointer = checkpointPointers == null || checkpointPointers.length == 0 ?
transactionId : checkpointPointers[checkpointPointers.length - 1];
doCommit(transactionId, writePointer, new ChangeSet(null, edit.getChanges()),
edit.getCommitPointer(), edit.getCanCommit());
break;
case INVALID:
doInvalidate(edit.getWritePointer());
break;
case ABORTED:
type = edit.getType();
// Check if transaction edit needs to be migrated to have type. Previous versions of
// ABORTED edits did not contain type.
// This can be removed when we stop supporting TransactionEditCodecV2.
if (type == null) {
InProgressTx inProgressTx = inProgress.get(edit.getWritePointer());
if (inProgressTx != null) {
InProgressType inProgressType = inProgressTx.getType();
if (InProgressType.CHECKPOINT.equals(inProgressType)) {
// this should never happen, because checkpoints never go into the log edits;
LOG.debug("Ignoring ABORTED edit for a checkpoint transaction {}", edit.getWritePointer());
break;
}
if (inProgressType != null) {
type = inProgressType.getTransactionType();
}
} else {
// If transaction is not in-progress, then it has either been already aborted or invalidated.
// We cannot determine the transaction's state based on current information, to be safe invalidate it.
LOG.warn("Invalidating transaction {} as it's type cannot be determined during replay",
edit.getWritePointer());
doInvalidate(edit.getWritePointer());
break;
}
}
doAbort(edit.getWritePointer(), edit.getCheckpointPointers(), type);
break;
case TRUNCATE_INVALID_TX:
if (edit.getTruncateInvalidTxTime() != 0) {
doTruncateInvalidTxBefore(edit.getTruncateInvalidTxTime());
} else {
doTruncateInvalidTx(edit.getTruncateInvalidTx());
}
break;
case CHECKPOINT:
doCheckpoint(edit.getWritePointer(), edit.getParentWritePointer());
break;
default:
// unknown type!
throw new IllegalArgumentException("Invalid state for WAL entry: " + edit.getState());
}
}
} catch (IOException | InvalidTruncateTimeException e) {
throw Throwables.propagate(e);
}
LOG.info("Read " + editCnt + " edits from log " + log.getName());
}
}