async Task ApplyRecoveryOrReplicationOperationAsync()

in src/prod/src/managed/Api/src/System/Fabric/btree/stateproviders/SortedDictionaryStateProvider.cs [6781:7213]


        async Task ApplyRecoveryOrReplicationOperationAsync(bool isRecovery, IOperationEx operation, CancellationToken cancellationToken)
        {
            bool existing = false;
            IReadWriteTransaction rwtx = null;
            IBtreeOperation btreeOperation = null;           
            if (isRecovery)
            {
                if (this.allowReadableSecondary)
                {
                    AppTrace.TraceSource.WriteNoise(
                        "Data.SortedDictionaryStateProvider.ApplyRecoveryOperationAsync",
                        "{0} operation {1} readable",
                        this.ToString(),
                        operation.SequenceNumber);

                    switch (operation.OperationType)
                    {
                        case OperationTypeEx.CreateAtomicGroup:
                            //
                            // Create transaction for the new atomic group.
                            //
                            rwtx = this.txManager.CreateTransaction(operation.AtomicGroupId, out existing);
                            ReplicatedStateProvider.Assert(!existing, "unexpected existing transaction");
                            break;
                        case OperationTypeEx.CommitAtomicGroup:
                        case OperationTypeEx.RollbackAtomicGroup:
                            //
                            // Release transaction locks.
                            //
                            this.txManager.RemoveTransaction(operation.AtomicGroupId);
                            //
                            // Record operation stable.
                            //
                            await this.btreeStore.OnOperationStableAsync(operation.SequenceNumber, CancellationToken.None);
                            break;
                        case OperationTypeEx.Redo:
                        case OperationTypeEx.Undo:
                            //
                            // Decode operation first.
                            //
                            btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, true, CancellationToken.None);
                            if (null == btreeOperation)
                            {
                                ReplicatedStateProvider.Assert(false, "null btree operation");
                            }
                            ReplicatedStateProvider.Assert(null != btreeOperation.Key, "null btree operation key");
                            //
                            // Acquire appropriate locks. Timeouts on applying replication operations are infinite.
                            //
                            rwtx = this.txManager.GetTransaction(operation.AtomicGroupId);
                            ReplicatedStateProvider.Assert(null != rwtx, "null transaction");
                            await this.AcquireWriteLocks(btreeOperation.Key, btreeOperation.OperationType, rwtx, Timeout.Infinite);
                            //
                            // Apply redo/undo operation in the btree store. 
                            //
                            btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
                            ReplicatedStateProvider.Assert(
                                null != btreeOperation && null != btreeOperation.Key && null != btreeOperation.Value,
                                "unexpected apply with output");
                            break;
                        case OperationTypeEx.SingleOperation:
                            //
                            // Decode operation first.
                            //
                            btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, true, CancellationToken.None);
                            if (null == btreeOperation)
                            {
                                ReplicatedStateProvider.Assert(false, "null btree operation");
                            }
                            ReplicatedStateProvider.Assert(
                                BtreeOperationType.Erase == btreeOperation.OperationType && null == btreeOperation.Key && null == btreeOperation.Value,
                                "unexpected btree operation");
                            //
                            // Acquire appropriate locks. Timeouts on applying replication/recovery operations are infinite.
                            //
                            rwtx = this.txManager.CreateTransaction(FabricReplicatorEx.InvalidAtomicGroupId, out existing);
                            ReplicatedStateProvider.Assert(!existing, "unexpected existing transaction");
                            await this.AcquireWriteLocks(btreeOperation.Key, btreeOperation.OperationType, rwtx, Timeout.Infinite);
                            //
                            // Apply redo only operation in the btree store.
                            //
                            btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
                            ReplicatedStateProvider.Assert(
                                null != btreeOperation && BtreeOperationType.Erase == btreeOperation.OperationType && null == btreeOperation.Key && null == btreeOperation.Value,
                                "unexpected apply with output");
                            //
                            // Release transaction locks.
                            //
                            this.txManager.RemoveTransaction(rwtx);
                            break;
                        case OperationTypeEx.RedoPassComplete:
                            //
                            // Mark the state provider as readable.
                            //
                            this.isReadable = true;

                            AppTrace.TraceSource.WriteNoise(
                                "Data.SortedDictionaryStateProvider.ApplyRecoveryOperationAsync",
                                "{0} becomes readable",
                                this.ToString());
                            break;
                        case OperationTypeEx.UndoPassComplete:
                            //
                            // Nothing to do.
                            //
                            break;
                        case OperationTypeEx.EndOfStream:
                            //
                            // Nothing to do.
                            //
                            break;
                    }
                }
                else
                {
                    AppTrace.TraceSource.WriteNoise(
                        "Data.SortedDictionaryStateProvider.ApplyRecoveryOperationAsync",
                        "{0} operation {1}",
                        this.ToString(),
                        operation.SequenceNumber);

                    switch (operation.OperationType)
                    {
                        case OperationTypeEx.CreateAtomicGroup:
                            //
                            // Nothing to do.
                            //
                            break;
                        case OperationTypeEx.CommitAtomicGroup:
                        case OperationTypeEx.RollbackAtomicGroup:
                            //
                            // Record operation stable.
                            //
                            await this.btreeStore.OnOperationStableAsync(operation.SequenceNumber, CancellationToken.None);
                            break;
                        case OperationTypeEx.Redo:
                        case OperationTypeEx.Undo:
                            //
                            // Apply redo/undo operation in the btree store.
                            //
                            btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
                            ReplicatedStateProvider.Assert(
                                null != btreeOperation && null != btreeOperation.Key && null != btreeOperation.Value,
                                "unexpected apply with output");
                            break;
                        case OperationTypeEx.SingleOperation:
                            //
                            // Apply redo only operation in the btree store.
                            //
                            btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
                            ReplicatedStateProvider.Assert(
                                null != btreeOperation && BtreeOperationType.Erase == btreeOperation.OperationType && null == btreeOperation.Key && null == btreeOperation.Value,
                                "unexpected apply with output");
                            break;
                        case OperationTypeEx.RedoPassComplete:
                        case OperationTypeEx.UndoPassComplete:
                            //
                            // Nothing to do.
                            //
                            break;
                        case OperationTypeEx.EndOfStream:
                            //
                            // Nothing to do.
                            //
                            break;
                    }
                }
            }
            else
            {
                //
                // We are either in the online rollback stream processing on the primary or 
                // in the replication stream processing on the secondary.
                //
                if (ReplicaRole.Primary == this.ReplicaRole)
                {
                    AppTrace.TraceSource.WriteNoise(
                        "Data.SortedDictionaryStateProvider.ApplyReplicationOperationAsync",
                        "{0} operation {1} undo stream",
                        this.ToString(),
                        operation.SequenceNumber);

                    NotifyCollectionChangedEventHandler eventHandler = null;
                    switch (operation.OperationType)
                    {
                        case OperationTypeEx.CreateAtomicGroup:
                        case OperationTypeEx.AbortAtomicGroup:
                        case OperationTypeEx.CommitAtomicGroup:
                        case OperationTypeEx.Redo:
                        case OperationTypeEx.SingleOperation:
                            ReplicatedStateProvider.Assert(false, "unexpected operation type");
                            break;
                        case OperationTypeEx.RollbackAtomicGroup:
                            //
                            // Fire notifications if needed.
                            //
                            eventHandler = this.CollectionChanged;
                            if (null != eventHandler)
                            {
                                //
                                // Retrieve transaction.
                                //
                                rwtx = this.txManager.GetTransaction(operation.AtomicGroupId);
                                ReplicatedStateProvider.Assert(null != rwtx, "unexpected null transaction");
                                //
                                // Batch fire notifications.
                                //
                                this.FireAllCollectionChangedNotifications(rwtx);
                            }
                            //
                            // Release transaction locks.
                            //
                            this.txManager.RemoveTransaction(operation.AtomicGroupId);
                            break;
                        case OperationTypeEx.Undo:
                            //
                            // Locks are still being held at this time.
                            //
                            btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
                            ReplicatedStateProvider.Assert(
                                null != btreeOperation && null != btreeOperation.Key && null != btreeOperation.Value,
                                "unexpected apply with output");
                            //
                            // Reconstruct notifications if needed.
                            //
                            eventHandler = this.CollectionChanged;
                            if (null != eventHandler)
                            {
                                //
                                // Retrieve transaction.
                                //
                                rwtx = this.txManager.GetTransaction(operation.AtomicGroupId);
                                ReplicatedStateProvider.Assert(null != rwtx, "unexpected null transaction");
                                //
                                // Reconstruct state provider operation.
                                //
                                if (null == btreeOperation)
                                {
                                    ReplicatedStateProvider.Assert(null != btreeOperation, "unexpected btree operation");
                                }
                                StateProviderOperation stateProviderOperation = null;
                                switch (btreeOperation.OperationType)
                                {
                                    case BtreeOperationType.Insert:
                                        stateProviderOperation = new SortedDictionaryStateProviderAddOperation<K, V>();
                                        ((SortedDictionaryStateProviderAddOperation<K, V>)stateProviderOperation).Key = (new BtreeKey<K, KBC>(btreeOperation.Key.Bytes)).Key;
                                        ((SortedDictionaryStateProviderAddOperation<K, V>)stateProviderOperation).Value = (new BtreeValue<V, VBC>(btreeOperation.Value.Bytes)).Value;
                                        break;
                                    case BtreeOperationType.Delete:
                                        stateProviderOperation = new SortedDictionaryStateProviderRemoveOperation<K, V>();
                                        ((SortedDictionaryStateProviderRemoveOperation<K, V>)stateProviderOperation).Key = (new BtreeKey<K, KBC>(btreeOperation.Key.Bytes)).Key;
                                        ((SortedDictionaryStateProviderRemoveOperation<K, V>)stateProviderOperation).Value = (new BtreeValue<V, VBC>(btreeOperation.Value.Bytes)).Value;
                                        break;
                                    case BtreeOperationType.Update:
                                        stateProviderOperation = new SortedDictionaryStateProviderUpdateOperation<K, V>();
                                        ((SortedDictionaryStateProviderUpdateOperation<K, V>)stateProviderOperation).Key = (new BtreeKey<K, KBC>(btreeOperation.Key.Bytes)).Key;
                                        ((SortedDictionaryStateProviderUpdateOperation<K, V>)stateProviderOperation).NewValue = (new BtreeValue<V, VBC>(btreeOperation.Value.Bytes)).Value;
                                        break;
                                    case BtreeOperationType.Erase:
                                        stateProviderOperation = new SortedDictionaryStateProviderClearOperation();
                                        break;
                                    case BtreeOperationType.PartialUpdate:
                                        ReplicatedStateProvider.Assert(false, "unexpected operation type");
                                        break;
                                }
                                ReplicatedStateProvider.Assert(null != stateProviderOperation, "unexpected null operation");
                                //
                                // Notification will be fired when the transaction is completely rollback.
                                //
                                rwtx.Context.Add(stateProviderOperation);
                            }
                            break;
                        case OperationTypeEx.EndOfStream:
                            //
                            // Nothing to do.
                            //
                            break;
                    }
                }
                else
                {
                    if (this.allowReadableSecondary)
                    {
                        AppTrace.TraceSource.WriteNoise(
                            "Data.SortedDictionaryStateProvider.ApplyReplicationOperationAsync",
                            "{0} operation {1} readable",
                            this.ToString(),
                            operation.SequenceNumber);

                        switch (operation.OperationType)
                        {
                            case OperationTypeEx.CreateAtomicGroup:
                                //
                                // Create transaction for the new atomic group.
                                //
                                rwtx = this.txManager.CreateTransaction(operation.AtomicGroupId, out existing);
                                ReplicatedStateProvider.Assert(!existing, "unexpected existing transaction");
                                break;
                            case OperationTypeEx.CommitAtomicGroup:
                            case OperationTypeEx.RollbackAtomicGroup:
                                //
                                // Release transaction locks.
                                //
                                this.txManager.RemoveTransaction(operation.AtomicGroupId);
                                //
                                // Record operation stable.
                                //
                                await this.btreeStore.OnOperationStableAsync(operation.SequenceNumber, CancellationToken.None);
                                break;
                            case OperationTypeEx.AbortAtomicGroup:
                                //
                                // Release transaction locks.
                                //
                                this.txManager.RemoveTransaction(operation.AtomicGroupId);
                                break;
                            case OperationTypeEx.Redo:
                            case OperationTypeEx.Undo:
                                //
                                // Decode operation first.
                                //
                                btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, true, CancellationToken.None);
                                if (null == btreeOperation)
                                {
                                    ReplicatedStateProvider.Assert(false, "null btree operation");
                                }
                                ReplicatedStateProvider.Assert(null != btreeOperation.Key, "null btree operation key");
                                //
                                // Acquire appropriate locks. Timeouts on applying replication operations are infinite.
                                //
                                rwtx = this.txManager.GetTransaction(operation.AtomicGroupId);
                                ReplicatedStateProvider.Assert(null != rwtx, "unexpected null transaction");
                                await this.AcquireWriteLocks(btreeOperation.Key, btreeOperation.OperationType, rwtx, Timeout.Infinite);
                                //
                                // Apply redo/undo operation in the btree store. 
                                //
                                btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
                                ReplicatedStateProvider.Assert(
                                    null != btreeOperation && null != btreeOperation.Key && null != btreeOperation.Value,
                                    "unexpected apply with output");
                                break;
                            case OperationTypeEx.SingleOperation:
                                //
                                // Decode operation first.
                                //
                                btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, true, CancellationToken.None);
                                if (null == btreeOperation)
                                {
                                    ReplicatedStateProvider.Assert(false, "null btree operation");
                                }
                                ReplicatedStateProvider.Assert(BtreeOperationType.Erase == btreeOperation.OperationType, "unexpected apply with output");
                                //
                                // Acquire appropriate locks. Timeouts on applying replication operations are infinite.
                                //
                                rwtx = this.txManager.CreateTransaction(FabricReplicatorEx.InvalidAtomicGroupId, out existing);
                                ReplicatedStateProvider.Assert(!existing, "unexpected existing transaction");
                                await this.AcquireWriteLocks(btreeOperation.Key, btreeOperation.OperationType, rwtx, Timeout.Infinite);
                                //
                                // Apply redo only operation in the btree store.
                                //
                                btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
                                ReplicatedStateProvider.Assert(
                                    null != btreeOperation && BtreeOperationType.Erase == btreeOperation.OperationType,
                                    "unexpected apply with output");
                                //
                                // Release transaction locks.
                                //
                                this.txManager.RemoveTransaction(rwtx);
                                break;
                            case OperationTypeEx.EndOfStream:
                                //
                                // Nothing to do.
                                //
                                break;
                            case OperationTypeEx.RedoPassComplete: /* Work item 1189766. */
                                //
                                // Mark the state provider as readable.
                                //
                                this.isReadable = true;

                                AppTrace.TraceSource.WriteNoise(
                                    "Data.SortedDictionaryStateProvider.ApplyReplicationOperationAsync",
                                    "{0} becomes readable",
                                    this.ToString());
                                break;
                        }
                    }
                    else
                    {
                        AppTrace.TraceSource.WriteNoise(
                            "Data.SortedDictionaryStateProvider.ApplyReplicationOperationAsync",
                            "{0} operation {1}",
                            this.ToString(),
                            operation.SequenceNumber);

                        switch (operation.OperationType)
                        {
                            case OperationTypeEx.CreateAtomicGroup:
                            case OperationTypeEx.AbortAtomicGroup:
                                //
                                // Nothing to do.
                                //
                                break;
                            case OperationTypeEx.CommitAtomicGroup:
                            case OperationTypeEx.RollbackAtomicGroup:
                                //
                                // Record operation stable.
                                //
                                await this.btreeStore.OnOperationStableAsync(operation.SequenceNumber, CancellationToken.None);
                                break;
                            case OperationTypeEx.Redo:
                            case OperationTypeEx.Undo:
                                //
                                // Apply redo/undo operation.
                                //
                                await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
                                break;
                            case OperationTypeEx.SingleOperation:
                                //
                                // Apply redo only operation.
                                //
                                await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
                                break;
                            case OperationTypeEx.RedoPassComplete: /* Work item 1189766. */
                            case OperationTypeEx.EndOfStream:
                                //
                                // Nothing to do.
                                //
                                break;
                        }
                    }
                }
            }
        }