public async Task TruncateTailAsync()

in src/prod/src/managed/Microsoft.ServiceFabric.Data.Impl/Replicator/TruncateTailManager.cs [47:530]


        public async Task<LogRecord> TruncateTailAsync()
        {
            Utility.Assert(
                tailLsn < this.replicatedLogManager.CurrentLogTailLsn,
                "tailLsn < this.currentLogTailLsn. Current log tail lsn: {0}",
                this.replicatedLogManager.CurrentLogTailLsn.LSN);

            Utility.Assert(
                tailLsn >= this.replicatedLogManager.LastCompletedBeginCheckpointRecord.Lsn,
                "tailLsn >= this.LastCompletedBeginCheckpointRecord.LastLogicalSequenceNumber. LastCompletedBeginCheckpointLogRecord: {0}",
                this.replicatedLogManager.LastCompletedBeginCheckpointRecord.Lsn);

            Utility.Assert(
                this.replicatedLogManager.LastInProgressTruncateHeadRecord == null,
                "this.lastInProgressTruncateHeadRecord == null");

            var currentRecord = this.replicatedLogManager.LogManager.CurrentLogTailRecord;
            var currentLsn = currentRecord.Lsn;

            var isUpdateRecordAtTail = true;
            var recoveredLsn = currentLsn;
            EndCheckpointLogRecord endCheckpointLogRecord = null;
            CompleteCheckpointLogRecord completeCheckpointLogRecord = null;
            var lastPhysicalRecord = this.replicatedLogManager.LogManager.CurrentLastPhysicalRecord;
            do
            {
                Utility.Assert(
                    LogRecord.IsInvalidRecord(currentRecord) == false,
                    "LogRecord.IsInvalidRecord(currentRecord ({0})) == false",
                    currentRecord);

                if (isUpdateRecordAtTail == true)
                {
                    isUpdateRecordAtTail = currentRecord.Lsn == recoveredLsn;
                }

                OperationData metaData = null;

                switch (currentRecord.RecordType)
                {
                    case LogRecordType.BeginTransaction:
                        var beginTransactionRecord = (BeginTransactionOperationLogRecord) currentRecord;

                        // Cache the latest metadata just read from disk
                        metaData = beginTransactionRecord.MetaData;

                        beginTransactionRecord = this.transactionsMap.DeleteTransaction(beginTransactionRecord);

                        // Reset the metadata of the transaction as it may have been modified during redo pass
                        beginTransactionRecord.MetaData = metaData;

                        if (beginTransactionRecord.IsSingleOperationTransaction)
                        {
                            Utility.Assert(
                                beginTransactionRecord.Lsn != LogicalSequenceNumber.InvalidLsn,
                                "begin transaction record lsn must not be invalid.");

                            beginTransactionRecord.Transaction.CommitSequenceNumber = beginTransactionRecord.Lsn.LSN;

                            var operationContext =
                                await
                                    this.stateManager.OnApplyAsync(
                                        beginTransactionRecord.Lsn.LSN,
                                        beginTransactionRecord.Transaction,
                                        beginTransactionRecord.MetaData,
                                        beginTransactionRecord.Undo,
                                        falseProgressApplyContext).ConfigureAwait(false);

                            if (operationContext != null)
                            {
                                this.stateManager.Unlock(operationContext);
                            }

                            FabricEvents.Events.TruncateTailSingleOperationTransactionRecord(
                                this.tracer.Type,
                                "Deleted",
                                beginTransactionRecord.Lsn.LSN,
                                beginTransactionRecord.Psn.PSN,
                                beginTransactionRecord.RecordPosition,
                                beginTransactionRecord.Transaction.Id);
                        }
                        else
                        {
                            FabricEvents.Events.TruncateTailTransactionRecord(
                                this.tracer.Type,
                                "Deleted",
                                beginTransactionRecord.Lsn.LSN,
                                beginTransactionRecord.Psn.PSN,
                                beginTransactionRecord.RecordPosition,
                                beginTransactionRecord.Transaction.Id);
                        }

                        break;

                    case LogRecordType.Operation:
                        var operationRecord = (OperationLogRecord) currentRecord;

                        // Cache the latest metadata just read from disk
                        metaData = operationRecord.MetaData;

                        operationRecord = this.transactionsMap.RedactOperation(operationRecord);

                        // Reset the metadata of the operation as it may have been modified during redo pass
                        operationRecord.MetaData = metaData;

                        if (operationRecord.Transaction.IsAtomicOperation == true)
                        {
                            Utility.Assert(
                                operationRecord.IsRedoOnly == false,
                                "TruncateTail- RedoOnly operation cannot be undone");
                            Utility.Assert(
                                operationRecord.Lsn != LogicalSequenceNumber.InvalidLsn,
                                "Operation's lsn must not be invalid.");

                            operationRecord.Transaction.CommitSequenceNumber = operationRecord.Lsn.LSN;

                            var operationContext =
                                await
                                    this.stateManager.OnApplyAsync(
                                        operationRecord.Lsn.LSN,
                                        operationRecord.Transaction,
                                        operationRecord.MetaData,
                                        operationRecord.Undo,
                                        falseProgressApplyContext).ConfigureAwait(false);

                            if (operationContext != null)
                            {
                                this.stateManager.Unlock(operationContext);
                            }

                            FabricEvents.Events.TruncateTailAtomicOperation(
                                this.tracer.Type,
                                operationRecord.Lsn.LSN,
                                operationRecord.Psn.PSN,
                                operationRecord.RecordPosition,
                                operationRecord.Transaction.Id);
                        }
                        else
                        {
                            FabricEvents.Events.TruncateTailOperationRecord(
                                this.tracer.Type,
                                "Deleted",
                                operationRecord.Lsn.LSN,
                                operationRecord.Psn.PSN,
                                operationRecord.RecordPosition,
                                operationRecord.Transaction.Id);
                        }

                        break;

                    case LogRecordType.EndTransaction:
                        var endTransactionRecord = (EndTransactionLogRecord) currentRecord;
                        endTransactionRecord = this.transactionsMap.ReifyTransaction(endTransactionRecord);

                        Utility.Assert(
                            endTransactionRecord.Lsn != LogicalSequenceNumber.InvalidLsn,
                            "end transaction record cannot have an invalid lsn.");

                        // Undo all operations (Call apply with undo).
                        if (endTransactionRecord.IsCommitted == true)
                        {
                            TransactionLogRecord transactionRecord = endTransactionRecord;
                            do
                            {
                                // During recovery operations that may be undo are kept in memory.
                                // Since Truncate tail uses the in-memory links, Transaction have already been created and their commit sequence numbers
                                // have been set during recovery redo.
                                Utility.Assert(
                                    transactionRecord.RecordType == LogRecordType.EndTransaction
                                    || transactionRecord.Transaction.CommitSequenceNumber
                                    != LogicalSequenceNumber.InvalidLsn.LSN,
                                    "For an operation to be undone, it must already have been done.");

                                object operationContext = null;
                                transactionRecord = transactionRecord.ParentTransactionRecord;
                                Utility.Assert(transactionRecord != null, "transactionRecord != null");
                                if (transactionRecord is BeginTransactionOperationLogRecord)
                                {
                                    // Cache the metadata read from disk
                                    var justReadTransactionRecord =
                                        await this.recoveryLogsReader.GetNextLogRecord(transactionRecord.RecordPosition).ConfigureAwait(false);
                                    Utility.Assert(
                                        justReadTransactionRecord.RecordType == LogRecordType.BeginTransaction,
                                        "Just read transaction during false progress is not begintransaction. It is {0}",
                                        justReadTransactionRecord.RecordType);
                                    var justReadBeginTransactionRecord =
                                        (BeginTransactionOperationLogRecord) justReadTransactionRecord;

                                    var beginTx = (BeginTransactionOperationLogRecord) transactionRecord;
                                    beginTx.MetaData = justReadBeginTransactionRecord.MetaData;

                                    Utility.Assert(
                                        beginTx.IsSingleOperationTransaction == false,
                                        "beginTx.IsSingleOperationTransaction must be false when endTxRecord is being processed");

                                    operationContext =
                                        await
                                            this.stateManager.OnApplyAsync(
                                                beginTx.Lsn.LSN,
                                                beginTx.Transaction,
                                                beginTx.MetaData,
                                                beginTx.Undo,
                                                falseProgressApplyContext).ConfigureAwait(false);

                                    if (operationContext != null)
                                    {
                                        beginTx.OperationContext = operationContext;
                                    }

                                    break;
                                }

                                // Cache the metadata read from disk
                                var justReadOperationLogRecord =
                                    await this.recoveryLogsReader.GetNextLogRecord(transactionRecord.RecordPosition).ConfigureAwait(false);
                                Utility.Assert(
                                    justReadOperationLogRecord.RecordType == LogRecordType.Operation,
                                    "Just read operation during false progress is not of the right type. It is {0}",
                                    justReadOperationLogRecord.RecordType);
                                var justReadOperationRecord = (OperationLogRecord) justReadOperationLogRecord;

                                operationRecord = (OperationLogRecord) transactionRecord;
                                operationRecord.MetaData = justReadOperationRecord.MetaData;

                                operationContext =
                                    await
                                        this.stateManager.OnApplyAsync(
                                            operationRecord.Lsn.LSN,
                                            operationRecord.Transaction,
                                            operationRecord.MetaData,
                                            operationRecord.Undo,
                                            falseProgressApplyContext).ConfigureAwait(false);

                                if (operationContext != null)
                                {
                                    operationRecord.OperationContext = operationContext;
                                }
                            } while (true);

                            // call unlock
                            transactionRecord = endTransactionRecord;
                            do
                            {
                                object operationContext = null;
                                transactionRecord = transactionRecord.ParentTransactionRecord;
                                Utility.Assert(transactionRecord != null, "transactionRecord != null");
                                if (transactionRecord is BeginTransactionOperationLogRecord)
                                {
                                    var beginTx = (BeginTransactionOperationLogRecord) transactionRecord;
                                    operationContext = beginTx.ResetOperationContext();

                                    if (operationContext != null)
                                    {
                                        this.stateManager.Unlock(operationContext);
                                    }

                                    FabricEvents.Events.TruncateTailOperationRecord(
                                        this.tracer.Type,
                                        "Undone",
                                        beginTx.Lsn.LSN,
                                        beginTx.Psn.PSN,
                                        beginTx.RecordPosition,
                                        beginTx.Transaction.Id);

                                    break;
                                }

                                operationRecord = (OperationLogRecord) transactionRecord;
                                operationContext = operationRecord.ResetOperationContext();
                                if (operationContext != null)
                                {
                                    this.stateManager.Unlock(operationContext);
                                }

                                FabricEvents.Events.TruncateTailOperationRecord(
                                    this.tracer.Type,
                                    "Undone",
                                    operationRecord.Lsn.LSN,
                                    operationRecord.Psn.PSN,
                                    operationRecord.RecordPosition,
                                    operationRecord.Transaction.Id);
                            } while (true);
                        }

                        FabricEvents.Events.TruncateTailTransactionRecord(
                            this.tracer.Type,
                            "Reified",
                            endTransactionRecord.Lsn.LSN,
                            endTransactionRecord.Psn.PSN,
                            endTransactionRecord.RecordPosition,
                            endTransactionRecord.Transaction.Id);
                        break;

                    case LogRecordType.Barrier:
                        var barrierRecord = (BarrierLogRecord) currentRecord;

                        FabricEvents.Events.TruncateTailBarrier(
                            this.tracer.Type,
                            barrierRecord.Lsn.LSN,
                            barrierRecord.Psn.PSN,
                            barrierRecord.RecordPosition);
                        break;

                    case LogRecordType.Backup:
                        // Inform the backup manager that the last backup log record has been undone.
                        this.backupManager.UndoLastCompletedBackupLogRecord();

                        // Trace that the backup log record has been false progressed.
                        var backupRecord = (BackupLogRecord)currentRecord;
#if !DotNetCoreClr
                        // These are new events defined in System.Fabric, existing CoreCLR apps would break 
                        // if these events are refernced as it wont be found. As CoreCLR apps carry System.Fabric
                        // along with application
                        // This is just a mitigation for now. Actual fix being tracked via bug# 11614507

                        FabricEvents.Events.TruncateTailBackup(
                            this.tracer.Type,
                            backupRecord.Lsn.LSN,
                            backupRecord.Psn.PSN,
                            backupRecord.RecordPosition);
#endif
                        break;

                    case LogRecordType.UpdateEpoch:

                        // These records can only occur at the tail
                        Utility.Assert(isUpdateRecordAtTail == true, "isUpdateRecordAtTail == true");

                        var updateEpochRecord = (UpdateEpochLogRecord) currentRecord;
                        var lastVector = new ProgressVectorEntry(updateEpochRecord);
                        this.replicatedLogManager.ProgressVector.TruncateTail(lastVector);

                        FabricEvents.Events.TruncateTailUpdateEpoch(
                            this.tracer.Type,
                            lastVector.Epoch.DataLossNumber,
                            lastVector.Epoch.ConfigurationNumber,
                            updateEpochRecord.Lsn.LSN,
                            updateEpochRecord.Psn.PSN,
                            updateEpochRecord.RecordPosition);
                        break;

                    case LogRecordType.EndCheckpoint:
                        Utility.Assert(
                            currentRecord.Psn == this.replicatedLogManager.LastCompletedEndCheckpointRecord.Psn,
                            "currentRecord.Psn == this.lastCompletedEndCheckpointRecord.Psn");
                        Utility.Assert(
                            currentRecord.Psn == this.replicatedLogManager.LastLinkedPhysicalRecord.Psn,
                            "currentRecord.Psn == this.lastLinkedPhysicalRecord.Psn");
                        endCheckpointLogRecord = this.replicatedLogManager.LastCompletedEndCheckpointRecord;
                        this.replicatedLogManager.OnTruncateTailOfLastLinkedPhysicalRecord();
                        goto case LogRecordType.Indexing;

                    case LogRecordType.TruncateHead:
                        Utility.Assert(
                            currentRecord.Psn == this.replicatedLogManager.LastLinkedPhysicalRecord.Psn,
                            "currentRecord.Psn == this.lastLinkedPhysicalRecord.Psn");
                        var truncateHeadRecord = (TruncateHeadLogRecord) currentRecord;
                        Utility.Assert(
                            truncateHeadRecord.IsStable == false,
                            "Stable truncateHeadRecord cannot be undone due to false progress");
                        this.replicatedLogManager.OnTruncateTailOfLastLinkedPhysicalRecord();
                        goto case LogRecordType.Indexing;

                    case LogRecordType.CompleteCheckpoint:
                        completeCheckpointLogRecord = currentRecord as CompleteCheckpointLogRecord;
                        this.replicatedLogManager.OnTruncateTailOfLastLinkedPhysicalRecord();
                        goto case LogRecordType.Indexing;

                    case LogRecordType.Indexing:
                    case LogRecordType.TruncateTail:
                    case LogRecordType.BeginCheckpoint:
                    case LogRecordType.Information:
                        Utility.Assert(
                            currentRecord.Psn == lastPhysicalRecord.Psn,
                            "currentRecord.Psn == lastPhysicalRecord.Psn");
                        lastPhysicalRecord = lastPhysicalRecord.PreviousPhysicalRecord;
                        break;

                    default:
                        Utility.CodingError("Unexpected record type {0}", currentRecord.RecordType);
                        break;
                }

                currentRecord = await this.recoveryLogsReader.GetPreviousLogRecord(currentRecord.RecordPosition).ConfigureAwait(false);
                currentLsn = currentRecord.Lsn;
            } while (currentLsn > tailLsn);

            Utility.Assert(currentLsn == tailLsn, "V1 replicator ensures that lsns are continuous. currentLsn {0} == tailLsn {1}", currentLsn, tailLsn);

            if (currentRecord is LogicalLogRecord)
            {
                switch (currentRecord.RecordType)
                {
                    case LogRecordType.BeginTransaction:
                        var beginTransactionRecord =
                            (BeginTransactionOperationLogRecord) currentRecord;
                        currentRecord = this.transactionsMap.FindTransaction(beginTransactionRecord);

                        // Single operation transactions are not stored in the tx map and hence are not returned above. As a result, they dont have a valid indexing of the previous physical record
                        if (beginTransactionRecord.IsSingleOperationTransaction)
                        {
                            currentRecord.PreviousPhysicalRecord = lastPhysicalRecord;
                        }

                        break;

                    case LogRecordType.Operation:
                        var operationRecord = (OperationLogRecord) currentRecord;
                        currentRecord = this.transactionsMap.FindOperation(operationRecord);

                        // Atomic operations are not stored in the tx map and hence are not returned above. As a result, they dont have a valid indexing of the previous physical record
                        if (operationRecord.Transaction.IsAtomicOperation)
                        {
                            currentRecord.PreviousPhysicalRecord = lastPhysicalRecord;
                        }

                        break;

                    case LogRecordType.EndTransaction:
                        var endTransactionRecord = (EndTransactionLogRecord) currentRecord;
                        currentRecord = this.transactionsMap.FindUnstableTransaction(endTransactionRecord);
                        break;

                    case LogRecordType.Backup:
                    case LogRecordType.Barrier:
                        currentRecord.PreviousPhysicalRecord = lastPhysicalRecord;
                        break;

                    case LogRecordType.UpdateEpoch:
                        currentRecord.PreviousPhysicalRecord = lastPhysicalRecord;
                        break;

                    default:
                        Utility.CodingError("Unexpected record type {0}", currentRecord.RecordType);
                        break;
                }

                Utility.Assert(
                    currentRecord.PreviousPhysicalRecord == lastPhysicalRecord,
                    "currentRecord.PreviousPhysicalRecord == lastPhysicalRecord");
            }
            else
            {
                Utility.Assert(
                    lastPhysicalRecord.Psn == currentRecord.Psn,
                    "lastPhysicalRecord.Psn == currentRecord.Psn");
                currentRecord = lastPhysicalRecord;
            }

            Utility.Assert(
                (this.replicatedLogManager.LastLinkedPhysicalRecord == lastPhysicalRecord)
                || (this.replicatedLogManager.LastLinkedPhysicalRecord == lastPhysicalRecord.LinkedPhysicalRecord),
                "(this.lastLinkedPhysicalRecord == lastPhysicalRecord) || (this.lastLinkedPhysicalRecord == lastPhysicalRecord.LinkedPhysicalRecord)");
            await this.replicatedLogManager.LogManager.PerformLogTailTruncationAsync(currentRecord).ConfigureAwait(false);
            this.replicatedLogManager.SetTailLsn(tailLsn);

            // If endchkpoint was truncated, even complete checkpoint must be truncated 
            if (endCheckpointLogRecord != null)
            {
                Utility.Assert(
                    completeCheckpointLogRecord != null,
                    "TruncateTailAsync: EndCheckpoint was truncated but CompleteCheckpoint was not");

                this.replicatedLogManager.EndCheckpoint(endCheckpointLogRecord.LastCompletedBeginCheckpointRecord);
            }

            if (completeCheckpointLogRecord != null)
            {
                this.replicatedLogManager.CompleteCheckpoint();
            }

            this.replicatedLogManager.TruncateTail(tailLsn);

            await this.replicatedLogManager.LogManager.FlushAsync("TruncateTailAsync").ConfigureAwait(false);

            FabricEvents.Events.TruncateTailDone(
                this.tracer.Type,
                currentRecord.RecordType.ToString(),
                currentRecord.Lsn.LSN,
                currentRecord.Psn.PSN,
                currentRecord.RecordPosition);

            return currentRecord;
        }