in geode-core/src/distributedTest/java/org/apache/geode/cache30/TXDistributedDUnitTest.java [523:1013]
public void testHighAvailabilityFeatures() throws Exception {
IgnoredException.addIgnoredException("DistributedSystemDisconnectedException");
final String rgnName = getUniqueName();
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setEarlyAck(false);
Region rgn = getCache().createRegion(rgnName, factory.create());
Invoke.invokeInEveryVM(
new SerializableRunnable("testHighAvailabilityFeatures: intial region configuration") {
@Override
public void run() {
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
});
// create entries
rgn.put("key0", "val0_0");
rgn.put("key1", "val1_0");
Host host = Host.getHost(0);
// This test assumes that there are at least three VMs; the origin and two recipients
assertTrue(host.getVMCount() >= 3);
final VM originVM = host.getVM(0);
// Test that there is no commit after a partial commit message
// send (only sent to a minority of the recipients)
originVM.invoke(new SerializableRunnable("Flakey DuringIndividualSend Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the 2nd duringIndividualSend
// call.
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setDuringIndividualSend(new Runnable() {
private int numCalled = 0;
@Override
public synchronized void run() {
++numCalled;
rgn1.getCache().getLogger()
.info("setDuringIndividualSend Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
rgn1.put("key0", "val0_1");
rgn1.put("key1", "val1_1");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on all VMs that the transaction was not committed
final SerializableRunnable noChangeValidator =
new SerializableRunnable("testHighAvailabilityFeatures: validate no change in Region") {
@Override
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
if (rgn1 == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
rgn1 = getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
Region.Entry re = rgn1.getEntry("key0");
assertNotNull(re);
assertEquals("val0_0", re.getValue());
re = rgn1.getEntry("key1");
assertNotNull(re);
assertEquals("val1_0", re.getValue());
}
};
Invoke.invokeInEveryVM(noChangeValidator);
// Test that there is no commit after sending to all recipients
// but prior to sending the "commit process" message
originVM.invoke(new SerializableRunnable("Flakey AfterIndividualSend Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on AfterIndividualSend
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setAfterIndividualSend(new Runnable() {
@Override
public synchronized void run() {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
});
rgn1.put("key0", "val0_2");
rgn1.put("key1", "val1_2");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on all VMs, including the origin, that the transaction was not committed
Invoke.invokeInEveryVM(noChangeValidator);
// Test commit success upon a single commit process message received.
originVM.invoke(new SerializableRunnable("Flakey DuringIndividualCommitProcess Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the 2nd internalDuringIndividualCommitProcess
// call.
txState.setDuringIndividualCommitProcess(new Runnable() {
private int numCalled = 0;
@Override
public synchronized void run() {
++numCalled;
rgn1.getCache().getLogger()
.info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
rgn1.put("key0", "val0_3");
rgn1.put("key1", "val1_3");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on all VMs that the transaction was committed (including the orgin, due to GII)
SerializableRunnable nonSoloChangeValidator1 = new SerializableRunnable(
"testHighAvailabilityFeatures: validate v1 non-solo Region changes") {
@Override
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
if (rgn1 == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
rgn1 = getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
long giveUp = System.currentTimeMillis() + 10000;
while (giveUp > System.currentTimeMillis()) {
try {
Region.Entry re = rgn1.getEntry("key0");
assertNotNull(re);
assertEquals("val0_3", re.getValue());
re = rgn1.getEntry("key1");
assertNotNull(re);
assertEquals("val1_3", re.getValue());
break;
} catch (AssertionError e) {
if (giveUp > System.currentTimeMillis()) {
throw e;
}
}
}
}
};
Invoke.invokeInEveryVM(nonSoloChangeValidator1);
// Verify successful solo region commit after duringIndividualSend
// (same as afterIndividualSend).
// Create a region that only exists on the origin and another VM
final String soloRegionName = getUniqueName() + "_solo";
SerializableRunnable createSoloRegion =
new SerializableRunnable("testHighAvailabilityFeatures: solo region configuration") {
@Override
public void run() {
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
Region rgn1 = getCache().createRegion(soloRegionName, factory2.create());
rgn1.put("soloKey0", "soloVal0_0");
rgn1.put("soloKey1", "soloVal1_0");
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
};
final VM soloRegionVM = host.getVM(1);
originVM.invoke(createSoloRegion);
soloRegionVM.invoke(createSoloRegion);
originVM
.invoke(new SerializableRunnable("Flakey solo region DuringIndividualSend Transaction") {
@Override
public void run() {
final Region soloRgn = getCache().getRegion(soloRegionName);
assertNotNull(soloRgn);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the 2nd duringIndividualSend
// call.
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState =
(TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setDuringIndividualSend(new Runnable() {
private int numCalled = 0;
@Override
public synchronized void run() {
++numCalled;
soloRgn.getCache().getLogger()
.info("setDuringIndividualSend Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
soloRgn.put("soloKey0", "soloVal0_1");
soloRgn.put("soloKey1", "soloVal1_1");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
soloRgn.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// 3. verify on the soloRegionVM that the transaction was committed
final SerializableRunnable soloRegionCommitValidator1 = new SerializableRunnable(
"testHighAvailabilityFeatures: validate successful v1 commit in solo Region") {
@Override
public void run() {
Region soloRgn = getCache().getRegion(soloRegionName);
if (soloRgn == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
soloRgn = getCache().createRegion(soloRegionName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region ", e);
}
}
Region.Entry re = soloRgn.getEntry("soloKey0");
assertNotNull(re);
assertEquals("soloVal0_1", re.getValue());
re = soloRgn.getEntry("soloKey1");
assertNotNull(re);
assertEquals("soloVal1_1", re.getValue());
}
};
originVM.invoke(soloRegionCommitValidator1);
soloRegionVM.invoke(soloRegionCommitValidator1);
// verify no change in nonSolo region, re-establish region in originVM
Invoke.invokeInEveryVM(nonSoloChangeValidator1);
// Verify no commit for failed send (afterIndividualSend) for solo
// Region combined with non-solo Region
originVM.invoke(new SerializableRunnable(
"Flakey mixed (solo+non-solo) region DuringIndividualSend Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
final Region soloRgn = getCache().getRegion(soloRegionName);
assertNotNull(soloRgn);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the afterIndividualSend
// call.
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setAfterIndividualSend(new Runnable() {
@Override
public synchronized void run() {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
});
rgn1.put("key0", "val0_4");
rgn1.put("key1", "val1_4");
soloRgn.put("soloKey0", "soloVal0_2");
soloRgn.put("soloKey1", "soloVal1_2");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
// Origin and Solo Region VM should be the same as last validation
originVM.invoke(soloRegionCommitValidator1);
soloRegionVM.invoke(soloRegionCommitValidator1);
Invoke.invokeInEveryVM(nonSoloChangeValidator1);
// Verify commit after sending a single
// (duringIndividualCommitProcess) commit process for solo Region
// combined with non-solo Region
originVM.invoke(new SerializableRunnable(
"Flakey mixed (solo+non-solo) region DuringIndividualCommitProcess Transaction") {
@Override
public void run() {
final Region rgn1 = getCache().getRegion(rgnName);
assertNotNull(rgn1);
final Region soloRgn = getCache().getRegion(soloRegionName);
assertNotNull(soloRgn);
try {
final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
final CacheTransactionManager txMgrImpl = txMgr2;
txMgr2.begin();
// 1. setup an internal callback on originVM that will call
// disconnectFromDS() on the afterIndividualSend
// call.
((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
.getRealDeal(null, null);
txState.setAfterIndividualSend(new Runnable() {
private int numCalled = 0;
@Override
public synchronized void run() {
++numCalled;
rgn1.getCache().getLogger()
.info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
if (numCalled > 1) {
MembershipManagerHelper.crashDistributedSystem(getSystem());
}
}
});
rgn1.put("key0", "val0_5");
rgn1.put("key1", "val1_5");
soloRgn.put("soloKey0", "soloVal0_3");
soloRgn.put("soloKey1", "soloVal1_3");
// 2. commit a transaction in originVM, it will disconnect from the DS
txMgr2.commit();
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable e) {
rgn1.getCache().getLogger().warning("Ignoring Exception", e);
} finally {
// Allow this VM to re-connect to the DS upon getCache() call
closeCache();
}
}
});
final SerializableRunnable soloRegionCommitValidator2 = new SerializableRunnable(
"testHighAvailabilityFeatures: validate successful v2 commit in solo Region") {
@Override
public void run() {
Region soloRgn = getCache().getRegion(soloRegionName);
if (soloRgn == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
soloRgn = getCache().createRegion(soloRegionName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region ", e);
}
}
Region.Entry re = soloRgn.getEntry("soloKey0");
assertNotNull(re);
assertEquals("soloVal0_3", re.getValue());
re = soloRgn.getEntry("soloKey1");
assertNotNull(re);
assertEquals("soloVal1_3", re.getValue());
}
};
originVM.invoke(soloRegionCommitValidator2);
soloRegionVM.invoke(soloRegionCommitValidator2);
SerializableRunnable nonSoloChangeValidator2 = new SerializableRunnable(
"testHighAvailabilityFeatures: validate v2 non-solo Region changes") {
@Override
public void run() {
Region rgn1 = getCache().getRegion(rgnName);
if (rgn1 == null) {
// Expect a null region from originVM
try {
AttributesFactory factory2 = new AttributesFactory();
factory2.setScope(Scope.DISTRIBUTED_ACK);
factory2.setEarlyAck(false);
factory2.setDataPolicy(DataPolicy.REPLICATE);
rgn1 = getCache().createRegion(rgnName, factory2.create());
} catch (CacheException e) {
Assert.fail("While creating region", e);
}
}
Region.Entry re = rgn1.getEntry("key0");
assertNotNull(re);
assertEquals("val0_5", re.getValue());
re = rgn1.getEntry("key1");
assertNotNull(re);
assertEquals("val1_5", re.getValue());
}
};
Invoke.invokeInEveryVM(nonSoloChangeValidator2);
}