in src/prod/src/managed/Api/src/System/Fabric/btree/stateproviders/SortedDictionaryStateProvider.cs [6781:7213]
async Task ApplyRecoveryOrReplicationOperationAsync(bool isRecovery, IOperationEx operation, CancellationToken cancellationToken)
{
bool existing = false;
IReadWriteTransaction rwtx = null;
IBtreeOperation btreeOperation = null;
if (isRecovery)
{
if (this.allowReadableSecondary)
{
AppTrace.TraceSource.WriteNoise(
"Data.SortedDictionaryStateProvider.ApplyRecoveryOperationAsync",
"{0} operation {1} readable",
this.ToString(),
operation.SequenceNumber);
switch (operation.OperationType)
{
case OperationTypeEx.CreateAtomicGroup:
//
// Create transaction for the new atomic group.
//
rwtx = this.txManager.CreateTransaction(operation.AtomicGroupId, out existing);
ReplicatedStateProvider.Assert(!existing, "unexpected existing transaction");
break;
case OperationTypeEx.CommitAtomicGroup:
case OperationTypeEx.RollbackAtomicGroup:
//
// Release transaction locks.
//
this.txManager.RemoveTransaction(operation.AtomicGroupId);
//
// Record operation stable.
//
await this.btreeStore.OnOperationStableAsync(operation.SequenceNumber, CancellationToken.None);
break;
case OperationTypeEx.Redo:
case OperationTypeEx.Undo:
//
// Decode operation first.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, true, CancellationToken.None);
if (null == btreeOperation)
{
ReplicatedStateProvider.Assert(false, "null btree operation");
}
ReplicatedStateProvider.Assert(null != btreeOperation.Key, "null btree operation key");
//
// Acquire appropriate locks. Timeouts on applying replication operations are infinite.
//
rwtx = this.txManager.GetTransaction(operation.AtomicGroupId);
ReplicatedStateProvider.Assert(null != rwtx, "null transaction");
await this.AcquireWriteLocks(btreeOperation.Key, btreeOperation.OperationType, rwtx, Timeout.Infinite);
//
// Apply redo/undo operation in the btree store.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
ReplicatedStateProvider.Assert(
null != btreeOperation && null != btreeOperation.Key && null != btreeOperation.Value,
"unexpected apply with output");
break;
case OperationTypeEx.SingleOperation:
//
// Decode operation first.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, true, CancellationToken.None);
if (null == btreeOperation)
{
ReplicatedStateProvider.Assert(false, "null btree operation");
}
ReplicatedStateProvider.Assert(
BtreeOperationType.Erase == btreeOperation.OperationType && null == btreeOperation.Key && null == btreeOperation.Value,
"unexpected btree operation");
//
// Acquire appropriate locks. Timeouts on applying replication/recovery operations are infinite.
//
rwtx = this.txManager.CreateTransaction(FabricReplicatorEx.InvalidAtomicGroupId, out existing);
ReplicatedStateProvider.Assert(!existing, "unexpected existing transaction");
await this.AcquireWriteLocks(btreeOperation.Key, btreeOperation.OperationType, rwtx, Timeout.Infinite);
//
// Apply redo only operation in the btree store.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
ReplicatedStateProvider.Assert(
null != btreeOperation && BtreeOperationType.Erase == btreeOperation.OperationType && null == btreeOperation.Key && null == btreeOperation.Value,
"unexpected apply with output");
//
// Release transaction locks.
//
this.txManager.RemoveTransaction(rwtx);
break;
case OperationTypeEx.RedoPassComplete:
//
// Mark the state provider as readable.
//
this.isReadable = true;
AppTrace.TraceSource.WriteNoise(
"Data.SortedDictionaryStateProvider.ApplyRecoveryOperationAsync",
"{0} becomes readable",
this.ToString());
break;
case OperationTypeEx.UndoPassComplete:
//
// Nothing to do.
//
break;
case OperationTypeEx.EndOfStream:
//
// Nothing to do.
//
break;
}
}
else
{
AppTrace.TraceSource.WriteNoise(
"Data.SortedDictionaryStateProvider.ApplyRecoveryOperationAsync",
"{0} operation {1}",
this.ToString(),
operation.SequenceNumber);
switch (operation.OperationType)
{
case OperationTypeEx.CreateAtomicGroup:
//
// Nothing to do.
//
break;
case OperationTypeEx.CommitAtomicGroup:
case OperationTypeEx.RollbackAtomicGroup:
//
// Record operation stable.
//
await this.btreeStore.OnOperationStableAsync(operation.SequenceNumber, CancellationToken.None);
break;
case OperationTypeEx.Redo:
case OperationTypeEx.Undo:
//
// Apply redo/undo operation in the btree store.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
ReplicatedStateProvider.Assert(
null != btreeOperation && null != btreeOperation.Key && null != btreeOperation.Value,
"unexpected apply with output");
break;
case OperationTypeEx.SingleOperation:
//
// Apply redo only operation in the btree store.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
ReplicatedStateProvider.Assert(
null != btreeOperation && BtreeOperationType.Erase == btreeOperation.OperationType && null == btreeOperation.Key && null == btreeOperation.Value,
"unexpected apply with output");
break;
case OperationTypeEx.RedoPassComplete:
case OperationTypeEx.UndoPassComplete:
//
// Nothing to do.
//
break;
case OperationTypeEx.EndOfStream:
//
// Nothing to do.
//
break;
}
}
}
else
{
//
// We are either in the online rollback stream processing on the primary or
// in the replication stream processing on the secondary.
//
if (ReplicaRole.Primary == this.ReplicaRole)
{
AppTrace.TraceSource.WriteNoise(
"Data.SortedDictionaryStateProvider.ApplyReplicationOperationAsync",
"{0} operation {1} undo stream",
this.ToString(),
operation.SequenceNumber);
NotifyCollectionChangedEventHandler eventHandler = null;
switch (operation.OperationType)
{
case OperationTypeEx.CreateAtomicGroup:
case OperationTypeEx.AbortAtomicGroup:
case OperationTypeEx.CommitAtomicGroup:
case OperationTypeEx.Redo:
case OperationTypeEx.SingleOperation:
ReplicatedStateProvider.Assert(false, "unexpected operation type");
break;
case OperationTypeEx.RollbackAtomicGroup:
//
// Fire notifications if needed.
//
eventHandler = this.CollectionChanged;
if (null != eventHandler)
{
//
// Retrieve transaction.
//
rwtx = this.txManager.GetTransaction(operation.AtomicGroupId);
ReplicatedStateProvider.Assert(null != rwtx, "unexpected null transaction");
//
// Batch fire notifications.
//
this.FireAllCollectionChangedNotifications(rwtx);
}
//
// Release transaction locks.
//
this.txManager.RemoveTransaction(operation.AtomicGroupId);
break;
case OperationTypeEx.Undo:
//
// Locks are still being held at this time.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
ReplicatedStateProvider.Assert(
null != btreeOperation && null != btreeOperation.Key && null != btreeOperation.Value,
"unexpected apply with output");
//
// Reconstruct notifications if needed.
//
eventHandler = this.CollectionChanged;
if (null != eventHandler)
{
//
// Retrieve transaction.
//
rwtx = this.txManager.GetTransaction(operation.AtomicGroupId);
ReplicatedStateProvider.Assert(null != rwtx, "unexpected null transaction");
//
// Reconstruct state provider operation.
//
if (null == btreeOperation)
{
ReplicatedStateProvider.Assert(null != btreeOperation, "unexpected btree operation");
}
StateProviderOperation stateProviderOperation = null;
switch (btreeOperation.OperationType)
{
case BtreeOperationType.Insert:
stateProviderOperation = new SortedDictionaryStateProviderAddOperation<K, V>();
((SortedDictionaryStateProviderAddOperation<K, V>)stateProviderOperation).Key = (new BtreeKey<K, KBC>(btreeOperation.Key.Bytes)).Key;
((SortedDictionaryStateProviderAddOperation<K, V>)stateProviderOperation).Value = (new BtreeValue<V, VBC>(btreeOperation.Value.Bytes)).Value;
break;
case BtreeOperationType.Delete:
stateProviderOperation = new SortedDictionaryStateProviderRemoveOperation<K, V>();
((SortedDictionaryStateProviderRemoveOperation<K, V>)stateProviderOperation).Key = (new BtreeKey<K, KBC>(btreeOperation.Key.Bytes)).Key;
((SortedDictionaryStateProviderRemoveOperation<K, V>)stateProviderOperation).Value = (new BtreeValue<V, VBC>(btreeOperation.Value.Bytes)).Value;
break;
case BtreeOperationType.Update:
stateProviderOperation = new SortedDictionaryStateProviderUpdateOperation<K, V>();
((SortedDictionaryStateProviderUpdateOperation<K, V>)stateProviderOperation).Key = (new BtreeKey<K, KBC>(btreeOperation.Key.Bytes)).Key;
((SortedDictionaryStateProviderUpdateOperation<K, V>)stateProviderOperation).NewValue = (new BtreeValue<V, VBC>(btreeOperation.Value.Bytes)).Value;
break;
case BtreeOperationType.Erase:
stateProviderOperation = new SortedDictionaryStateProviderClearOperation();
break;
case BtreeOperationType.PartialUpdate:
ReplicatedStateProvider.Assert(false, "unexpected operation type");
break;
}
ReplicatedStateProvider.Assert(null != stateProviderOperation, "unexpected null operation");
//
// Notification will be fired when the transaction is completely rollback.
//
rwtx.Context.Add(stateProviderOperation);
}
break;
case OperationTypeEx.EndOfStream:
//
// Nothing to do.
//
break;
}
}
else
{
if (this.allowReadableSecondary)
{
AppTrace.TraceSource.WriteNoise(
"Data.SortedDictionaryStateProvider.ApplyReplicationOperationAsync",
"{0} operation {1} readable",
this.ToString(),
operation.SequenceNumber);
switch (operation.OperationType)
{
case OperationTypeEx.CreateAtomicGroup:
//
// Create transaction for the new atomic group.
//
rwtx = this.txManager.CreateTransaction(operation.AtomicGroupId, out existing);
ReplicatedStateProvider.Assert(!existing, "unexpected existing transaction");
break;
case OperationTypeEx.CommitAtomicGroup:
case OperationTypeEx.RollbackAtomicGroup:
//
// Release transaction locks.
//
this.txManager.RemoveTransaction(operation.AtomicGroupId);
//
// Record operation stable.
//
await this.btreeStore.OnOperationStableAsync(operation.SequenceNumber, CancellationToken.None);
break;
case OperationTypeEx.AbortAtomicGroup:
//
// Release transaction locks.
//
this.txManager.RemoveTransaction(operation.AtomicGroupId);
break;
case OperationTypeEx.Redo:
case OperationTypeEx.Undo:
//
// Decode operation first.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, true, CancellationToken.None);
if (null == btreeOperation)
{
ReplicatedStateProvider.Assert(false, "null btree operation");
}
ReplicatedStateProvider.Assert(null != btreeOperation.Key, "null btree operation key");
//
// Acquire appropriate locks. Timeouts on applying replication operations are infinite.
//
rwtx = this.txManager.GetTransaction(operation.AtomicGroupId);
ReplicatedStateProvider.Assert(null != rwtx, "unexpected null transaction");
await this.AcquireWriteLocks(btreeOperation.Key, btreeOperation.OperationType, rwtx, Timeout.Infinite);
//
// Apply redo/undo operation in the btree store.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
ReplicatedStateProvider.Assert(
null != btreeOperation && null != btreeOperation.Key && null != btreeOperation.Value,
"unexpected apply with output");
break;
case OperationTypeEx.SingleOperation:
//
// Decode operation first.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, true, CancellationToken.None);
if (null == btreeOperation)
{
ReplicatedStateProvider.Assert(false, "null btree operation");
}
ReplicatedStateProvider.Assert(BtreeOperationType.Erase == btreeOperation.OperationType, "unexpected apply with output");
//
// Acquire appropriate locks. Timeouts on applying replication operations are infinite.
//
rwtx = this.txManager.CreateTransaction(FabricReplicatorEx.InvalidAtomicGroupId, out existing);
ReplicatedStateProvider.Assert(!existing, "unexpected existing transaction");
await this.AcquireWriteLocks(btreeOperation.Key, btreeOperation.OperationType, rwtx, Timeout.Infinite);
//
// Apply redo only operation in the btree store.
//
btreeOperation = await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
ReplicatedStateProvider.Assert(
null != btreeOperation && BtreeOperationType.Erase == btreeOperation.OperationType,
"unexpected apply with output");
//
// Release transaction locks.
//
this.txManager.RemoveTransaction(rwtx);
break;
case OperationTypeEx.EndOfStream:
//
// Nothing to do.
//
break;
case OperationTypeEx.RedoPassComplete: /* Work item 1189766. */
//
// Mark the state provider as readable.
//
this.isReadable = true;
AppTrace.TraceSource.WriteNoise(
"Data.SortedDictionaryStateProvider.ApplyReplicationOperationAsync",
"{0} becomes readable",
this.ToString());
break;
}
}
else
{
AppTrace.TraceSource.WriteNoise(
"Data.SortedDictionaryStateProvider.ApplyReplicationOperationAsync",
"{0} operation {1}",
this.ToString(),
operation.SequenceNumber);
switch (operation.OperationType)
{
case OperationTypeEx.CreateAtomicGroup:
case OperationTypeEx.AbortAtomicGroup:
//
// Nothing to do.
//
break;
case OperationTypeEx.CommitAtomicGroup:
case OperationTypeEx.RollbackAtomicGroup:
//
// Record operation stable.
//
await this.btreeStore.OnOperationStableAsync(operation.SequenceNumber, CancellationToken.None);
break;
case OperationTypeEx.Redo:
case OperationTypeEx.Undo:
//
// Apply redo/undo operation.
//
await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
break;
case OperationTypeEx.SingleOperation:
//
// Apply redo only operation.
//
await this.btreeStore.ApplyWithOutputAsync(operation.SequenceNumber, operation.Data, false, CancellationToken.None);
break;
case OperationTypeEx.RedoPassComplete: /* Work item 1189766. */
case OperationTypeEx.EndOfStream:
//
// Nothing to do.
//
break;
}
}
}
}
}