in geode-core/src/distributedTest/java/org/apache/geode/cache30/TXDistributedDUnitTest.java [1353:1556]
public void testRemoteCommitFailure() throws Exception {
try {
disconnectAllFromDS();
final String rgnName1 = getUniqueName() + "_1";
final String rgnName2 = getUniqueName() + "_2";
final String diskStoreName = getUniqueName() + "_store";
Host host = Host.getHost(0);
VM origin = host.getVM(0);
VM trouble1 = host.getVM(1);
VM trouble2 = host.getVM(2);
VM noTrouble = host.getVM(3);
CacheSerializableRunnable initRegions =
new CacheSerializableRunnable("Initialize no trouble regions") {
@Override
public void run2() {
getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
AttributesFactory af = new AttributesFactory();
af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
af.setScope(Scope.DISTRIBUTED_ACK);
af.setDiskStoreName(diskStoreName);
getCache().createRegion(rgnName1, af.create());
getCache().createRegion(rgnName2, af.create());
}
};
origin.invoke(initRegions);
noTrouble.invoke(initRegions);
SerializableRunnable initTroulbeRegions =
new CacheSerializableRunnable("Initialize regions that cause trouble") {
@Override
public void run2() {
getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs())
.create(diskStoreName);
TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
InternalRegionFactory factory = getCache().createInternalRegionFactory();
factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDiskStoreName(diskStoreName);
factory.setTestCallable(new TXTroubleMaker());
factory.create(rgnName1);
factory.create(rgnName2);
getCache().getInternalDistributedSystem().addResourceListener(new ShutdownListener());
}
};
trouble1.invoke(initTroulbeRegions);
trouble2.invoke(initTroulbeRegions);
SerializableRunnable doTransaction =
new CacheSerializableRunnable("Run failing transaction") {
@Override
public void run2() {
Cache c = getCache();
Region r1 = c.getRegion(rgnName1);
assertNotNull(r1);
Region r2 = c.getRegion(rgnName2);
assertNotNull(r2);
CacheTransactionManager txmgr = c.getCacheTransactionManager();
txmgr.begin();
r1.put("k1", "k1");
r1.put("k2", "k2");
r1.put(TROUBLE_KEY, TROUBLE_KEY);
r2.put("k1", "k1");
r2.put("k2", "k2");
r2.put(TROUBLE_KEY, TROUBLE_KEY);
try {
txmgr.commit();
fail("Expected an tx incomplete exception");
} catch (CommitIncompleteException yay) {
String msg = yay.getMessage();
// getLogWriter().info("failing exception", yay);
// Each region on a trouble VM should be mentioned (two regions per trouble VM)
int ind = 0, match = 0;
while ((ind = msg.indexOf(rgnName1, ind)) >= 0) {
ind++;
match++;
}
assertEquals(2, match);
ind = match = 0;
while ((ind = msg.indexOf(rgnName2, ind)) >= 0) {
ind++;
match++;
}
assertEquals(2, match);
// DiskAccessExcpetions should be mentioned four times
ind = match = 0;
while ((ind = msg.indexOf(DiskAccessException.class.getName(), ind)) >= 0) {
ind++;
match++;
}
assertEquals(4, match);
}
}
};
IgnoredException ee = null;
try {
ee = IgnoredException.addIgnoredException(
DiskAccessException.class.getName() + "|" + CommitIncompleteException.class.getName()
+ "|" + CommitReplyException.class.getName());
origin.invoke(doTransaction);
} finally {
if (ee != null) {
ee.remove();
}
}
SerializableCallable allowCacheToShutdown = new SerializableCallable() {
@Override
public Object call() throws Exception {
GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
List<ResourceEventsListener> listeners =
cache.getInternalDistributedSystem().getResourceListeners();
for (ResourceEventsListener l : listeners) {
if (l instanceof ShutdownListener) {
ShutdownListener shutListener = (ShutdownListener) l;
shutListener.unblockShutdown();
}
}
return null;
}
};
trouble1.invoke(allowCacheToShutdown);
trouble2.invoke(allowCacheToShutdown);
// Assert proper content on failing VMs
SerializableRunnable assertTroubledContent =
new CacheSerializableRunnable("Assert partail commit data") {
@Override
public void run2() {
final Cache c = getCache();
GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
@Override
public boolean done() {
return c.getRegion(rgnName1) == null;
}
@Override
public String description() {
return null;
}
});
Region r2 = c.getRegion(rgnName2);
assertNull(r2);
}
};
trouble1.invoke(assertTroubledContent);
trouble2.invoke(assertTroubledContent);
// Assert proper content on successful VMs
SerializableRunnable assertSuccessfulContent =
new CacheSerializableRunnable("Assert complete commit of data on successful VMs") {
@Override
public void run2() {
Cache c = getCache();
{
Region r1 = c.getRegion(rgnName1);
assertNotNull(r1);
assertEquals("k1", r1.getEntry("k1").getValue());
assertEquals("k2", r1.getEntry("k2").getValue());
assertEquals(TROUBLE_KEY, r1.getEntry(TROUBLE_KEY).getValue());
}
{
Region r2 = c.getRegion(rgnName2);
assertNotNull(r2);
assertEquals("k1", r2.getEntry("k1").getValue());
assertEquals("k2", r2.getEntry("k2").getValue());
assertEquals(TROUBLE_KEY, r2.getEntry(TROUBLE_KEY).getValue());
}
}
};
noTrouble.invoke(assertSuccessfulContent);
// Assert no content on originating VM
SerializableRunnable assertNoContent =
new CacheSerializableRunnable("Assert data survives on origin VM") {
@Override
public void run2() {
Cache c = getCache();
{
Region r1 = c.getRegion(rgnName1);
assertNotNull(r1);
assertNotNull(r1.getEntry("k1"));
assertNotNull(r1.getEntry("k2"));
assertNotNull(r1.getEntry(TROUBLE_KEY));
}
{
Region r2 = c.getRegion(rgnName2);
assertNotNull(r2);
assertNotNull(r2.getEntry("k1"));
assertNotNull(r2.getEntry("k2"));
assertNotNull(r2.getEntry(TROUBLE_KEY));
}
}
};
origin.invoke(assertNoContent);
} finally {
Invoke.invokeInEveryVM(new SerializableCallable() {
@Override
public Object call() throws Exception {
TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = false;
return null;
}
});
}
}