Awaitable TruncateTailManager::TruncateTailAsync()

in src/prod/src/data/txnreplicator/loggingreplicator/TruncateTailManager.cpp [82:800]


Awaitable<NTSTATUS> TruncateTailManager::TruncateTailAsync(__out LogRecord::SPtr & newTail) noexcept
{
    ASSERT_IFNOT(
        tailLsn_ < replicatedLogManager_->CurrentLogTailLsn,
        "{0} tailLsn < this.currentLogTailLsn. Current log tail lsn: {1}",
        TraceId,
        replicatedLogManager_->CurrentLogTailLsn);

    ASSERT_IFNOT(
        tailLsn_ >= replicatedLogManager_->LastCompletedBeginCheckpointRecord->Lsn,
        "{0} tailLsn >= this.LastCompletedBeginCheckpointRecord.LastLogicalSequenceNumber. LastCompletedBeginCheckpointLogRecord: {1}",
        TraceId,
        replicatedLogManager_->LastCompletedBeginCheckpointRecord->Lsn);

    ASSERT_IFNOT(
        replicatedLogManager_->LastInProgressTruncateHeadRecord == nullptr,
        "{0} this.lastInProgressTruncateHeadRecord == null",
        TraceId);

    LogRecord::SPtr currentRecord = replicatedLogManager_->InnerLogManager->CurrentLogTailRecord;
    LONG64 currentLsn = currentRecord->Lsn;

    NTSTATUS status = STATUS_SUCCESS;
    bool isUpdateRecordAtTail = true;
    LONG64 recoveredLsn = currentLsn;
    EndCheckpointLogRecord::SPtr endCheckpointLogRecord = nullptr;
    CompleteCheckPointLogRecord::SPtr completeCheckpointLogRecord = nullptr;
    PhysicalLogRecord::SPtr lastPhysicalRecord = replicatedLogManager_->InnerLogManager->CurrentLastPhysicalRecord;

    // This loop traverses the log backwards until the target tail lsn is reached while
    // safely removing records after the target tail lsn
    //
    // Removal of each record updates the transactions map. If the record represents an atomic operation 
    // or single operation transaction, ApplyAsync is called on the StateProviderManager with the provided 
    // falseProgressApplyContext (performing 'undo' on operation). StateProviderManager.Unlock(..) is then 
    // called with the OperationContext object returned from ApplyAsync.
    //
    // When undoing a committed EndTransactionLogRecord, all records associated with the transaction are undone 
    // and unlocked. They are not removed from the transaction map until the individual record is 
    // encountered as part of the backwards log traversal. 
    do
    {
        ASSERT_IF(
            LogRecord::IsInvalid(currentRecord.RawPtr()),
            "{0} LogRecord.IsInvalidRecord(currentRecord.RawPtr()) is invalid",
            TraceId);

        if(isUpdateRecordAtTail)
        {
            isUpdateRecordAtTail = currentRecord->Lsn == recoveredLsn;
        }

        OperationData::CSPtr metaData = nullptr;

        switch(currentRecord->RecordType)
        {
        case LogRecordType::Enum::BeginTransaction:
        {
            BeginTransactionOperationLogRecord::SPtr beginTransactionRecord = dynamic_cast<BeginTransactionOperationLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(beginTransactionRecord == nullptr, "Unexpected dynamic cast failure");

            // Cache the latest metadata just read from disk
            metaData = beginTransactionRecord->Metadata;

            // Remove this transaction from the transactions map
            beginTransactionRecord = transactionsMap_->DeleteTransaction(*beginTransactionRecord);

            // Reset the metadata of the transaction as it may have been modified during redo pass
            beginTransactionRecord->Metadata = *metaData;

            // If the begin transaction record represents a single operation transaction, it is undone here.
            // Transactions with more than one operation are undone in the EndTransaction case.
            if(beginTransactionRecord->IsSingleOperationTransaction)
            {
                ASSERT_IFNOT(
                    beginTransactionRecord->Lsn != Constants::InvalidLsn,
                    "{0} begin transaction record lsn must not be invalid",
                    TraceId);

                // Set the commit sequence number of the associated transaction to the current record LSN
                beginTransactionRecord->BaseTransaction.CommitSequenceNumber = beginTransactionRecord->Lsn;

                // Apply the updated begin transaction record with the associated false progress apply context
                OperationContext::CSPtr operationContext = nullptr;
                
                status = co_await stateManager_->ApplyAsync(
                    beginTransactionRecord->Lsn,
                    beginTransactionRecord->BaseTransaction,
                    falseProgressApplyContext_, 
                    beginTransactionRecord->Metadata.RawPtr(),
                    beginTransactionRecord->Undo.RawPtr(),
                    operationContext);
                
                CO_RETURN_ON_FAILURE(status);

                if(operationContext != nullptr)
                {
                    status = stateManager_->Unlock(*operationContext);

                    CO_RETURN_ON_FAILURE(status);
                }

                EventSource::Events->TruncateTailSingleOperationTransactionRecord(
                    TracePartitionId,
                    ReplicaId,
                    "Deleted",
                    beginTransactionRecord->Lsn,
                    beginTransactionRecord->Psn,
                    beginTransactionRecord->RecordPosition,
                    beginTransactionRecord->BaseTransaction.TransactionId);
            }
            else
            {
                EventSource::Events->TruncateTailTransactionRecord(
                    TracePartitionId,
                    ReplicaId,
                    "Deleted",
                    beginTransactionRecord->Lsn,
                    beginTransactionRecord->Psn,
                    beginTransactionRecord->RecordPosition,
                    beginTransactionRecord->BaseTransaction.TransactionId);
            }
            
            // Needed for keeping the transaction chain from not disappearing during destruction
            beginTransactionRecord->ParentTransactionRecord = invalidRecords_->Inv_TransactionLogRecord.RawPtr();
            break;
        }
        case LogRecordType::Enum::Operation:
        {
            OperationLogRecord::SPtr operationRecord = dynamic_cast<OperationLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(operationRecord == nullptr, "Unexpected dynamic cast failure");

            // Cache the latest metadata just read from disk
            metaData = operationRecord->Metadata;

            // Remove this operation from the transactions map
            // Updates the latest record associated with the transaction to the parent record
            operationRecord = transactionsMap_->RedactOperation(*operationRecord, *invalidRecords_->Inv_TransactionLogRecord);

            // Reset the metadata of the operation as it may have been modified during redo pass
            operationRecord->Metadata = *metaData;

            // If this operation log record represents an atomic operation, it is undone here
            // Operation log records associated with a transaction are undone in the EndTransaction case
            if(operationRecord->BaseTransaction.IsAtomicOperation)
            {
                ASSERT_IF(
                    operationRecord->IsRedoOnly,
                    "{0} TruncateTail - RedoOnly operation cannot be undone",
                    TraceId);

                ASSERT_IF(
                    operationRecord->Lsn == Constants::InvalidLsn,
                    "{0} Operation must have valid LSN",
                    TraceId);

                // Set the commit sequence number of the associated transaction to the current record LSN
                operationRecord->BaseTransaction.CommitSequenceNumber = operationRecord->Lsn;

                // Apply the updated begin transaction record with the associated false progress apply context
                OperationContext::CSPtr operationContext = nullptr;
                
                status = co_await stateManager_->ApplyAsync(
                    operationRecord->Lsn,
                    operationRecord->BaseTransaction,
                    falseProgressApplyContext_,
                    operationRecord->Metadata.RawPtr(),
                    operationRecord->Undo.RawPtr(),
                    operationContext);

                CO_RETURN_ON_FAILURE(status);

                if(operationContext != nullptr)
                {
                    status = stateManager_->Unlock(*operationContext);

                    CO_RETURN_ON_FAILURE(status);
                }

                EventSource::Events->TruncateTailAtomicOperation(
                    TracePartitionId,
                    ReplicaId,
                    operationRecord->Lsn,
                    operationRecord->Psn,
                    operationRecord->RecordPosition,
                    operationRecord->BaseTransaction.TransactionId);
            }
            else
            {
                EventSource::Events->TruncateTailOperationRecord(
                    TracePartitionId,
                    ReplicaId,
                    "Deleted",
                    operationRecord->Lsn,
                    operationRecord->Psn,
                    operationRecord->RecordPosition,
                    operationRecord->BaseTransaction.TransactionId);
            }

            // Needed for keeping the transaction chain from not disappearing during destruction
            operationRecord->ParentTransactionRecord = invalidRecords_->Inv_TransactionLogRecord.RawPtr();
            break;
        }
        case LogRecordType::Enum::EndTransaction:
        {
            EndTransactionLogRecord::SPtr endTransactionRecord = dynamic_cast<EndTransactionLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(
                endTransactionRecord == nullptr,
                "{0}: Unexpected dynamic cast failure",
                TraceId);

            endTransactionRecord = transactionsMap_->ReifyTransaction(*endTransactionRecord, *invalidRecords_->Inv_TransactionLogRecord);

            OperationLogRecord::SPtr operationRecord;

            ASSERT_IF(
                endTransactionRecord->Lsn == Constants::InvalidLsn,
                "{0} End transaction record cannot have an invalid lsn.",
                TraceId);

            // Undo all operations in this transaction (call ApplyAsync with undo).
            if(endTransactionRecord->IsCommitted)
            {
                TransactionLogRecord::SPtr transactionRecord = endTransactionRecord.RawPtr();

                OperationContext::CSPtr operationContext = nullptr;

                do
                {
                    // During recovery operations that may be undone are kept in memory.
                    // Since Truncate tail uses the in-memory links, Transaction have already been created and their commit sequence numbers
                    // have been set during recovery redo.
                    ASSERT_IFNOT(
                        transactionRecord->RecordType == LogRecordType::EndTransaction || 
                        transactionRecord->BaseTransaction.CommitSequenceNumber != Constants::InvalidLsn, 
                        "{0} For an operation to be undone, it must have already been done.", 
                        TraceId);

                    // Move to the parent record and continue traversing this transaction
                    transactionRecord = transactionRecord->ParentTransactionRecord;

                    ASSERT_IF(
                        transactionRecord == nullptr,
                        "{0} transactionRecord != nullptr",
                        TraceId)

                    // Process the begin transaction record associated with endTransactionRecord
                    if(transactionRecord->RecordType == LogRecordType::BeginTransaction)    
                    {
                        // Cache the metadata read from disk
                        LogRecord::SPtr justReadTransactionRecord = co_await recoveryLogsReader_->GetNextLogRecord(transactionRecord->RecordPosition);

                        ASSERT_IF(
                            justReadTransactionRecord->RecordType != LogRecordType::BeginTransaction,
                            "{0} Just read transaction during false progress is not begintransaction. It is {1}",
                            TraceId,
                            justReadTransactionRecord->RecordType);

                        BeginTransactionOperationLogRecord::SPtr justReadBeginTxRecord = dynamic_cast<BeginTransactionOperationLogRecord *>(justReadTransactionRecord.RawPtr());
                        ASSERT_IF(
                            justReadBeginTxRecord == nullptr,
                            "{0}: Unexpected dynamic cast failure",
                            TraceId);

                        BeginTransactionOperationLogRecord::SPtr beginTxRecord = dynamic_cast<BeginTransactionOperationLogRecord *>(transactionRecord.RawPtr());
                        ASSERT_IF(
                            beginTxRecord == nullptr,
                            "{0}: Unexpected dynamic cast failure",
                            TraceId);

                        beginTxRecord->Metadata = *justReadBeginTxRecord->Metadata;

                        ASSERT_IF(
                            beginTxRecord->IsSingleOperationTransaction,
                            "{0} beginTx.IsSingleOperationTransaction must be false when endTxRecord is being processed",
                            TraceId);

                        status = co_await stateManager_->ApplyAsync(
                            beginTxRecord->Lsn,
                            beginTxRecord->BaseTransaction,
                            falseProgressApplyContext_,
                            beginTxRecord->Metadata.RawPtr(),
                            beginTxRecord->Undo.RawPtr(),
                            operationContext);

                        CO_RETURN_ON_FAILURE(status);

                        if(operationContext != nullptr)
                        {
                            beginTxRecord->OperationContextValue = *operationContext;
                        }

                        // The BeginTransactionOperationLogRecord associated with endTransactionRecord has been processed
                        break;
                    }

                    // Process an operation log record associated with endTransactionRecord
                    // Cache the metadata read from disk
                    LogRecord::SPtr justReadOperationLogecord = co_await recoveryLogsReader_->GetNextLogRecord(transactionRecord->RecordPosition);

                    ASSERT_IF(
                        justReadOperationLogecord->RecordType != LogRecordType::Operation,
                        "{0} Just read operation during false progress is not of the right type. It is {1}",
                        TraceId,
                        justReadOperationLogecord->RecordType);

                    OperationLogRecord::SPtr justReadOperationRecord = dynamic_cast<OperationLogRecord *>(justReadOperationLogecord.RawPtr());
                    ASSERT_IF(
                        justReadOperationRecord == nullptr,
                        "{0}: Unexpected dynamic cast failure",
                        TraceId);
                            
                    operationRecord = dynamic_cast<OperationLogRecord *>(transactionRecord.RawPtr());
                    ASSERT_IF(
                        operationRecord == nullptr,
                        "{0}: Unexpected dynamic cast failure",
                        TraceId);

                    operationRecord->Metadata = *justReadOperationRecord->Metadata;

                    // Undo the operation
                    status = co_await stateManager_->ApplyAsync(
                        operationRecord->Lsn,
                        operationRecord->BaseTransaction,
                        falseProgressApplyContext_,
                        operationRecord->Metadata.RawPtr(),
                        operationRecord->Undo.RawPtr(),
                        operationContext);

                    CO_RETURN_ON_FAILURE(status);

                    if(operationContext != nullptr)
                    {
                        operationRecord->OperationContextValue = *operationContext;
                    }

                } while (true);

                // Call unlock for OperationLogRecords and the BeginTransactionLogRecord associated with endTransactionRecord
                transactionRecord = endTransactionRecord.RawPtr();
                do
                {
                    OperationContext::CSPtr opContext = nullptr;
                    transactionRecord = transactionRecord->ParentTransactionRecord;

                    ASSERT_IF(
                        transactionRecord == nullptr,
                        "{0} transactionRecord != nullptr",
                        TraceId)

                    if(transactionRecord->RecordType == LogRecordType::BeginTransaction)
                    {
                        BeginTransactionOperationLogRecord::SPtr beginTxRecord = dynamic_cast<BeginTransactionOperationLogRecord *>(transactionRecord.RawPtr());
                        ASSERT_IF(
                            beginTxRecord == nullptr,
                            "{0}: Unexpected dynamic cast failure",
                            TraceId);

                        // Retrieves the current OperationContext associated with beginTxRecord,and resets its value
                        opContext = beginTxRecord->ResetOperationContext();

                        if (opContext != nullptr)
                        {
                            stateManager_->Unlock(*opContext);
                        }

                        EventSource::Events->TruncateTailOperationRecord(
                            TracePartitionId,
                            ReplicaId,
                            "Undone",
                            beginTxRecord->Lsn,
                            beginTxRecord->Psn,
                            beginTxRecord->RecordPosition,
                            beginTxRecord->BaseTransaction.TransactionId);

                        break;
                    }

                    operationRecord = dynamic_cast<OperationLogRecord *>(transactionRecord.RawPtr());
                    ASSERT_IF(
                        operationRecord == nullptr,
                        "{0}: Unexpected dynamic cast failure",
                        TraceId);

                    // Retrieves the current OperationContext associated with operationRecord,and resets its value
                    opContext = operationRecord->ResetOperationContext();

                    if(opContext != nullptr)
                    {
                        stateManager_->Unlock(*opContext);
                    }

                    EventSource::Events->TruncateTailOperationRecord(
                        TracePartitionId,
                        ReplicaId,
                        "Undone",
                        operationRecord->Lsn,
                        operationRecord->Psn,
                        operationRecord->RecordPosition,
                        operationRecord->BaseTransaction.TransactionId);

                } while (true);
            }

            EventSource::Events->TruncateTailTransactionRecord(
                TracePartitionId,
                ReplicaId,
                "Reified",
                endTransactionRecord->Lsn,
                endTransactionRecord->Psn,
                endTransactionRecord->RecordPosition,
                endTransactionRecord->BaseTransaction.TransactionId);

            // Needed for keeping the transaction chain from not disappearing during destruction
            endTransactionRecord->ParentTransactionRecord = invalidRecords_->Inv_TransactionLogRecord.RawPtr();
            break;
        }
        case LogRecordType::Enum::Barrier:
        {
            BarrierLogRecord::SPtr barrierRecord = dynamic_cast<BarrierLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(
                barrierRecord == nullptr,
                "{0}: Unexpected dynamic cast failure",
                TraceId);

            EventSource::Events->TruncateTailBarrier(
                TracePartitionId,
                ReplicaId,
                barrierRecord->Lsn,
                barrierRecord->Psn,
                barrierRecord->RecordPosition);

            break;
        }
        case LogRecordType::Enum::Backup:
        {
            // Inform the backup manager that the last backup log record has been undone.
            backupManager_->UndoLastCompletedBackupLogRecord();

            // Trace that the backup log record has been false progressed.
            BackupLogRecord::SPtr backupRecord = dynamic_cast<BackupLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(
                backupRecord == nullptr,
                "{0}: Unexpected dynamic cast failure",
                TraceId);

            EventSource::Events->TruncateTailBackup(
                TracePartitionId,
                ReplicaId,
                backupRecord->Lsn,
                backupRecord->Psn,
                backupRecord->RecordPosition);

            break;
        }
        case LogRecordType::Enum::UpdateEpoch:
        {
            // These records can only occur at the tail
            ASSERT_IFNOT(
                isUpdateRecordAtTail, 
                "{0} isUpdateRecordAtTail is false", 
                TraceId);

            UpdateEpochLogRecord::CSPtr updateEpochRecord = dynamic_cast<UpdateEpochLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(
                updateEpochRecord == nullptr,
                "{0}: Unexpected dynamic cast failure",
                TraceId);

            ProgressVectorEntry lastVector(*updateEpochRecord);

            // Truncate the progress vector in the replicated log manager
            // Ensures outdated progress vector entries are not maintained on the secondary replica
            replicatedLogManager_->ProgressVectorValue->TruncateTail(lastVector);

            EventSource::Events->TruncateTailUpdateEpoch(
                TracePartitionId,
                ReplicaId,
                updateEpochRecord->EpochValue.DataLossVersion,
                updateEpochRecord->EpochValue.ConfigurationVersion,
                updateEpochRecord->Lsn,
                updateEpochRecord->Psn,
                updateEpochRecord->RecordPosition);

            break;
        }
        case LogRecordType::Enum::EndCheckpoint:
        {
            ASSERT_IF(
                currentRecord->Psn != replicatedLogManager_->LastCompletedEndCheckpointRecord->Psn,
                "{0} currentRecord.Psn({1}) != this.lastCompletedEndCheckpointRecord.Psn({2})",
                TraceId,
                currentRecord->Psn,
                replicatedLogManager_->LastCompletedEndCheckpointRecord->Psn);

            ASSERT_IF(
                currentRecord->Psn != replicatedLogManager_->LastLinkedPhysicalRecord->Psn,
                "{0} currentRecord.Psn({1}) != this.lastLinkedPhysicalRecord.Psn({2})",
                TraceId,
                currentRecord->Psn,
                replicatedLogManager_->LastLinkedPhysicalRecord->Psn);

            endCheckpointLogRecord = replicatedLogManager_->LastCompletedEndCheckpointRecord;

            // Update the last linked physical record
            replicatedLogManager_->OnTruncateTailOfLastLinkedPhysicalRecord();
            goto LastPhysicalRecordCase;
        }
        case LogRecordType::Enum::TruncateHead:
        {
            ASSERT_IF(
                currentRecord->Psn != replicatedLogManager_->LastLinkedPhysicalRecord->Psn,
                "{0} currentRecord.Psn({1}) != this.lastLinkedPhysicalRecord.Psn({2})",
                TraceId,
                currentRecord->Psn,
                replicatedLogManager_->LastLinkedPhysicalRecord->Psn);

            TruncateHeadLogRecord::SPtr truncateHeadRecord = dynamic_cast<TruncateHeadLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(truncateHeadRecord == nullptr, "Unexpected dynamic cast failure");

            ASSERT_IF(
                truncateHeadRecord->IsStable,
                "{0} Stable truncateHeadRecord cannot be undone due to false progress",
                TraceId);

            // Update the last linked physical record
            replicatedLogManager_->OnTruncateTailOfLastLinkedPhysicalRecord();
            goto LastPhysicalRecordCase;
        }
        case LogRecordType::Enum::CompleteCheckpoint:
        {
            completeCheckpointLogRecord = dynamic_cast<CompleteCheckPointLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(
                completeCheckpointLogRecord == nullptr,
                "{0}: Unexpected dynamic cast failure",
                TraceId);

            // Update the last linked physical record
            replicatedLogManager_->OnTruncateTailOfLastLinkedPhysicalRecord();
            goto LastPhysicalRecordCase;
        }
        case LogRecordType::Enum::Indexing:
        case LogRecordType::Enum::BeginCheckpoint:
        case LogRecordType::Enum::TruncateTail:
        case LogRecordType::Enum::Information:
        LastPhysicalRecordCase:
        {
            // The current record must be equal to the last physical record
            ASSERT_IF(
                currentRecord->Psn != lastPhysicalRecord->Psn,
                "{0} currentRecord.Psn({1}) != lastPhysicalRecord.Psn({2})",
                TraceId,
                currentRecord->Psn,
                lastPhysicalRecord->Psn);
            
            PhysicalLogRecord::SPtr current = lastPhysicalRecord;
            // Move up the last physical record to the previous physical record
            lastPhysicalRecord = lastPhysicalRecord->PreviousPhysicalRecord;

            // Needed for keeping the physical chain from not disappearing during destruction of the physical record
            current->PreviousPhysicalRecord = invalidRecords_->Inv_PhysicalLogRecord.RawPtr();
            break;
        }

        case LogRecordType::Enum::Invalid:
        default:
            {
                Common::Assert::CodingError(
                    "{0} Unexpected record type {1}",
                    TraceId,
                    currentRecord->RecordType);
            }
        }

        currentRecord = co_await recoveryLogsReader_->GetPreviousLogRecord(currentRecord->RecordPosition);
        currentLsn = currentRecord->Lsn;

    } while (currentLsn > tailLsn_);

    ASSERT_IF(
        currentLsn != tailLsn_,
        "{0} V1 replicator ensures that lsns are continuous. currentLsn {1} == tailLsn {2}",
        TraceId,
        currentLsn,
        tailLsn_);

    if(currentRecord->AsLogicalLogRecord() != nullptr)
    {
        switch(currentRecord->RecordType)
        {
        case LogRecordType::Enum::BeginTransaction:
        {
            BeginTransactionOperationLogRecord::SPtr beginTransactionRecord = dynamic_cast<BeginTransactionOperationLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(
                beginTransactionRecord == nullptr,
                "{0}: Unexpected dynamic cast failure",
                TraceId);

            currentRecord = transactionsMap_->FindTransaction(*beginTransactionRecord).RawPtr();

            // Single operation transactions are not stored in the tx map and hence are not returned above. 
            // As a result, they dont have a valid indexing of the previous physical record
            if(beginTransactionRecord->IsSingleOperationTransaction)
            {
                currentRecord->PreviousPhysicalRecord = lastPhysicalRecord.RawPtr();
            }

            break;
        }
        case LogRecordType::Enum::Operation:
        {
            OperationLogRecord::SPtr operationLogRecord = dynamic_cast<OperationLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(
                operationLogRecord == nullptr,
                "{0}: Unexpected dynamic cast failure",
                TraceId);

            currentRecord = transactionsMap_->FindOperation(*operationLogRecord).RawPtr();
            
            // Atomic operations are not stored in the tx map and hence are not returned above. 
            // As a result, they dont have a valid indexing of the previous physical record
            if(operationLogRecord->BaseTransaction.IsAtomicOperation)
            {
                currentRecord->PreviousPhysicalRecord = lastPhysicalRecord.RawPtr();
            }

            break;
        }
        case LogRecordType::Enum::EndTransaction:
        {
            EndTransactionLogRecord::SPtr endTransactionRecord = dynamic_cast<EndTransactionLogRecord *>(currentRecord.RawPtr());
            ASSERT_IF(
                endTransactionRecord == nullptr,
                "{0}: Unexpected dynamic cast failure",
                TraceId);

            currentRecord = transactionsMap_->FindUnstableTransaction(*endTransactionRecord).RawPtr();
            break;
        }
        case LogRecordType::Enum::Backup:
        case LogRecordType::Enum::Barrier:
        case LogRecordType::Enum::UpdateEpoch:
        {
            currentRecord->PreviousPhysicalRecord = lastPhysicalRecord.RawPtr();
            break;
        }
        default:
        {
            Common::Assert::CodingError(
                "{0} Unexpected record type {1}",
                TraceId,
                currentRecord->RecordType);
        }
        }

        ASSERT_IF(
            currentRecord->PreviousPhysicalRecord != lastPhysicalRecord,
            "{0} currentRecord.PreviousPhysicalRecord != lastPhysicalRecord",
            TraceId);
    }
    else
    {
        ASSERT_IF(
            lastPhysicalRecord->Psn != currentRecord->Psn,
            "{0} lastPhysicalRecord.Psn({1}) != currentRecord.Psn({2})",
            TraceId,
            lastPhysicalRecord->Psn,
            currentRecord->Psn);

        currentRecord = lastPhysicalRecord.RawPtr();
    }

    ASSERT_IFNOT(
        (replicatedLogManager_->LastLinkedPhysicalRecord == lastPhysicalRecord) ||
        (replicatedLogManager_->LastLinkedPhysicalRecord == lastPhysicalRecord->LinkedPhysicalRecord),
        "{0} (this.lastLinkedPhysicalRecord == lastPhysicalRecord) || (this.lastLinkedPhysicalRecord == lastPhysicalRecord.LinkedPhysicalRecord)",
        TraceId);

    // Perform the log truncation
    status = co_await replicatedLogManager_->InnerLogManager->PerformLogTailTruncationAsync(*currentRecord);
    
    CO_RETURN_ON_FAILURE(status);

    // Update the current tail lsn
    replicatedLogManager_->SetTailLsn(tailLsn_);

    // If end checkpoint was truncated, complete checkpoint must also be truncated
    if(endCheckpointLogRecord != nullptr)
    {
        ASSERT_IF(
            completeCheckpointLogRecord == nullptr,
            "{0} TruncateTailAsync: EndCheckpoint was truncated but CompleteCheckpoint was not",
            TraceId);

        replicatedLogManager_->EndCheckpoint(*endCheckpointLogRecord->LastCompletedBeginCheckpointRecord);
    }

    if(completeCheckpointLogRecord != nullptr)
    {
        replicatedLogManager_->CompleteCheckpoint();
    }

    // Insert and flush the truncate tail log record after successful truncation
    replicatedLogManager_->TruncateTail(tailLsn_);
    
    co_await replicatedLogManager_->InnerLogManager->FlushAsync(L"TruncateTailAsync");

    EventSource::Events->TruncateTailDone(
        TracePartitionId,
        ReplicaId,
        currentRecord->RecordType,
        currentRecord->Lsn,
        currentRecord->Psn,
        currentRecord->RecordPosition);

    newTail = currentRecord;

    co_return status;
}