in geode-dunit/src/main/java/org/apache/geode/cache30/MultiVMRegionTestCase.java [4909:5586]
public void testTXMultiRegion() {
assumeThat(supportsTransactions()).isTrue();
assumeThat(getRegionAttributes().getScope().isGlobal()).isFalse();
assumeThat(getRegionAttributes().getDataPolicy().withPersistence()).isFalse();
assertThat(getRegionAttributes().getScope().isDistributed()).isTrue();
CacheTransactionManager txMgr = getCache().getCacheTransactionManager();
final String rgnName1 = getUniqueName() + "MR1";
final String rgnName2 = getUniqueName() + "MR2";
final String rgnName3 = getUniqueName() + "MR3";
SerializableRunnable create1 = new SerializableRunnable() {
@Override
public void run() {
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = new MyTransactionListener();
setTxListener(txMgr2, tl);
try {
createRegion(rgnName1);
getSystem().getLogWriter().info("testTXMultiRegion: Created region1");
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
SerializableRunnable newKey1 = new SerializableRunnable() {
@Override
public void run() {
try {
Region<String, ?> rgn = getRootRegion("root").getSubregion(rgnName1);
rgn.create("key", null);
getSystem().getLogWriter().info("testTXMultiRegion: Created key");
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
SerializableRunnable create2 = new SerializableRunnable() {
@Override
public void run() {
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = new MyTransactionListener();
setTxListener(txMgr2, tl);
try {
createRegion(rgnName2);
getSystem().getLogWriter().info("testTXMultiRegion: Created region2");
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
SerializableRunnable newKey2 = new SerializableRunnable() {
@Override
public void run() {
try {
Region<?, ?> root = getRootRegion("root");
Region<String, ?> rgn = root.getSubregion(rgnName2);
rgn.create("key", null);
getSystem().getLogWriter().info("testTXMultiRegion: Created Key");
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
SerializableRunnable create3 = new SerializableRunnable() {
@Override
public void run() {
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = new MyTransactionListener();
setTxListener(txMgr2, tl);
try {
createRegion(rgnName3);
getSystem().getLogWriter().info("testTXMultiRegion: Created Region");
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
SerializableRunnable newKey3 = new SerializableRunnable() {
@Override
public void run() {
try {
Region<?, ?> root = getRootRegion("root");
Region<String, ?> rgn = root.getSubregion(rgnName3);
rgn.create("key", null);
getSystem().getLogWriter().info("testTXMultiRegion: Created Key");
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
SerializableRunnable check1_3 = new SerializableRunnable() {
@Override
public void run() {
Region<String, String> rgn1 = getRootRegion().getSubregion(rgnName1);
{
assertThat(rgn1.getEntry("key")).describedAs("Could not find entry for 'key'")
.isNotNull();
assertThat(rgn1.getEntry("key").getValue()).isEqualTo("value1");
}
Region<String, String> rgn3 = getRootRegion().getSubregion(rgnName3);
{
assertThat(rgn3.getEntry("key")).describedAs("Could not find entry for 'key'")
.isNotNull();
assertThat(rgn3.getEntry("key").getValue()).isEqualTo("value3");
}
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = firstTransactionListenerFrom(txMgr2);
tl.checkAfterCommitCount(1);
assertThat(tl.afterFailedCommitCount).isEqualTo(0);
assertThat(tl.afterRollbackCount).isEqualTo(0);
assertThat(tl.closeCount).isEqualTo(0);
assertThat(tl.lastEvent.getCache()).isEqualTo(rgn1.getRegionService());
{
Collection<EntryEvent<?, ?>> events;
RegionAttributes<String, String> attr = getRegionAttributes();
if (!attr.getDataPolicy().withReplication() || attr.getConcurrencyChecksEnabled()) {
events = TxEventTestUtil.getPutEvents(tl.lastEvent.getEvents());
} else {
events = TxEventTestUtil.getCreateEvents(tl.lastEvent.getEvents());
}
assertThat(events.size()).isEqualTo(2);
List<EntryEvent<?, ?>> eventList = new ArrayList<>(events);
eventList.sort((o1, o2) -> {
String s1 = o1.getRegion().getFullPath() + o1.getKey();
String s2 = o2.getRegion().getFullPath() + o2.getKey();
return s1.compareTo(s2);
});
verifyMirrorRegionEventsMatch(eventList.get(0), rgn1, "value1");
verifyMirrorRegionEventsMatch(eventList.get(1), rgn3, "value3");
}
}
};
SerializableRunnable check2_3 = new SerializableRunnable() {
@Override
public void run() {
Region rgn2 = getRootRegion().getSubregion(rgnName2);
{
assertThat(rgn2.getEntry("key")).describedAs("Could not find entry for 'key'")
.isNotNull();
assertThat(rgn2.getEntry("key").getValue()).isEqualTo("value2");
}
Region rgn3 = getRootRegion().getSubregion(rgnName3);
{
assertThat(rgn3.getEntry("key")).describedAs("Could not find entry for 'key'")
.isNotNull();
assertThat(rgn3.getEntry("key").getValue()).isEqualTo("value3");
}
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = firstTransactionListenerFrom(txMgr2);
tl.checkAfterCommitCount(1);
assertThat(tl.afterFailedCommitCount).isEqualTo(0);
assertThat(tl.afterRollbackCount).isEqualTo(0);
assertThat(tl.closeCount).isEqualTo(0);
assertThat(tl.lastEvent.getCache()).isEqualTo(rgn2.getRegionService());
{
Collection<EntryEvent<?, ?>> events;
RegionAttributes<String, String> attr = getRegionAttributes();
if (!attr.getDataPolicy().withReplication() || attr.getConcurrencyChecksEnabled()) {
events = TxEventTestUtil.getPutEvents(tl.lastEvent.getEvents());
} else {
events = TxEventTestUtil.getCreateEvents(tl.lastEvent.getEvents());
}
assertThat(events.size()).isEqualTo(2);
List<EntryEvent<?, ?>> eventList = new ArrayList<>(events);
eventList.sort((o1, o2) -> {
String s1 = o1.getRegion().getFullPath() + o1.getKey();
String s2 = o2.getRegion().getFullPath() + o2.getKey();
return s1.compareTo(s2);
});
verifyMirrorRegionEventsMatch(eventList.get(0), rgn2, "value2");
verifyMirrorRegionEventsMatch(eventList.get(1), rgn3, "value3");
}
}
};
SerializableRunnable check1 = new SerializableRunnable() {
@Override
public void run() {
Region<String, String> rgn = getRootRegion().getSubregion(rgnName1);
assertThat(rgn.getEntry("key")).describedAs("Could not find entry for 'key'").isNotNull();
assertThat(rgn.getEntry("key").getValue()).isEqualTo("value1");
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = firstTransactionListenerFrom(txMgr2);
tl.checkAfterCommitCount(1);
assertThat(tl.afterFailedCommitCount).isEqualTo(0);
assertThat(tl.afterRollbackCount).isEqualTo(0);
assertThat(tl.closeCount).isEqualTo(0);
assertThat(tl.lastEvent.getCache()).isEqualTo(rgn.getRegionService());
{
Collection<EntryEvent<?, ?>> events;
RegionAttributes<String, String> attr = getRegionAttributes();
if (!attr.getDataPolicy().withReplication() || attr.getConcurrencyChecksEnabled()) {
events = TxEventTestUtil.getPutEvents(tl.lastEvent.getEvents());
} else {
events = TxEventTestUtil.getCreateEvents(tl.lastEvent.getEvents());
}
assertThat(events.size()).isEqualTo(1);
EntryEvent<?, ?> ev = events.iterator().next();
assertThat(rgn).isSameAs(ev.getRegion());
assertThat(ev.getKey()).isEqualTo("key");
assertThat(ev.getNewValue()).isEqualTo("value1");
assertThat(ev.getOldValue()).isNull();
assertThat(ev.getOperation().isLocalLoad()).isFalse();
assertThat(ev.getOperation().isNetLoad()).isFalse();
assertThat(ev.getOperation().isLoad()).isFalse();
assertThat(ev.getOperation().isNetSearch()).isFalse();
assertThat(ev.getOperation().isExpiration()).isFalse();
assertThat(ev.getCallbackArgument()).isNull();
assertThat(ev.isCallbackArgumentAvailable()).isTrue();
assertThat(ev.isOriginRemote()).isTrue();
assertThat(ev.getOperation().isDistributed()).isTrue();
}
}
};
SerializableRunnable check2 = new SerializableRunnable() {
@Override
public void run() {
Region<String, String> rgn = getRootRegion().getSubregion(rgnName2);
assertThat(rgn.getEntry("key")).describedAs("Could not find entry for 'key'").isNotNull();
assertThat(rgn.getEntry("key").getValue()).isEqualTo("value2");
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = firstTransactionListenerFrom(txMgr2);
tl.checkAfterCommitCount(1);
assertThat(tl.afterFailedCommitCount).isEqualTo(0);
assertThat(tl.afterRollbackCount).isEqualTo(0);
assertThat(tl.closeCount).isEqualTo(0);
assertThat(tl.lastEvent.getCache()).isEqualTo(rgn.getRegionService());
{
Collection<EntryEvent<?, ?>> events;
RegionAttributes<String, String> attr = getRegionAttributes();
if (!attr.getDataPolicy().withReplication() || attr.getConcurrencyChecksEnabled()) {
events = TxEventTestUtil.getPutEvents(tl.lastEvent.getEvents());
} else {
events = TxEventTestUtil.getCreateEvents(tl.lastEvent.getEvents());
}
assertThat(events.size()).isEqualTo(1);
EntryEvent<?, ?> ev = events.iterator().next();
assertThat(rgn).isSameAs(ev.getRegion());
assertThat(ev.getKey()).isEqualTo("key");
assertThat(ev.getNewValue()).isEqualTo("value2");
assertThat(ev.getOldValue()).isNull();
assertThat(ev.getOperation().isLocalLoad()).isFalse();
assertThat(ev.getOperation().isNetLoad()).isFalse();
assertThat(ev.getOperation().isLoad()).isFalse();
assertThat(ev.getOperation().isNetSearch()).isFalse();
assertThat(ev.getOperation().isExpiration()).isFalse();
assertThat(ev.getCallbackArgument()).isNull();
assertThat(ev.isCallbackArgumentAvailable()).isTrue();
assertThat(ev.isOriginRemote()).isTrue();
assertThat(ev.getOperation().isDistributed()).isTrue();
}
}
};
SerializableRunnable check3 = new SerializableRunnable() {
@Override
public void run() {
Region<String, String> rgn = getRootRegion().getSubregion(rgnName3);
assertThat(rgn.getEntry("key")).describedAs("Could not find entry for 'key'").isNotNull();
assertThat(rgn.getEntry("key").getValue()).isEqualTo("value3");
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = firstTransactionListenerFrom(txMgr2);
tl.checkAfterCommitCount(1);
assertThat(tl.afterFailedCommitCount).isEqualTo(0);
assertThat(tl.afterRollbackCount).isEqualTo(0);
assertThat(tl.closeCount).isEqualTo(0);
assertThat(tl.lastEvent.getCache()).isEqualTo(rgn.getRegionService());
{
Collection<EntryEvent<?, ?>> events;
RegionAttributes attr = getRegionAttributes();
if (!attr.getDataPolicy().withReplication() || attr.getConcurrencyChecksEnabled()) {
events = TxEventTestUtil.getPutEvents(tl.lastEvent.getEvents());
} else {
events = TxEventTestUtil.getCreateEvents(tl.lastEvent.getEvents());
}
assertThat(events.size()).isEqualTo(1);
EntryEvent<?, ?> ev = events.iterator().next();
assertThat(rgn).isSameAs(ev.getRegion());
assertThat(ev.getKey()).isEqualTo("key");
assertThat(ev.getNewValue()).isEqualTo("value3");
assertThat(ev.getOldValue()).isNull();
assertThat(ev.getOperation().isLocalLoad()).isFalse();
assertThat(ev.getOperation().isNetLoad()).isFalse();
assertThat(ev.getOperation().isLoad()).isFalse();
assertThat(ev.getOperation().isNetSearch()).isFalse();
assertThat(ev.getOperation().isExpiration()).isFalse();
assertThat(ev.getCallbackArgument()).isNull();
assertThat(ev.isCallbackArgumentAvailable()).isTrue();
assertThat(ev.isOriginRemote()).isTrue();
assertThat(ev.getOperation().isDistributed()).isTrue();
}
}
};
try {
DMStats dmStats = getSystem().getDistributionManager().getStats();
Region<String, String> rgn1;
Region<String, String> rgn2;
Region<String, String> rgn3;
long cmtMsgs;
long commitWaits;
// vm0->R1,R3 vm1->R1,R3 vm2->R2 vm3->R2,R3
vm0.invoke("testTXMultiRegion: Create Region", create1);
vm0.invoke("testTXMultiRegion: Create Key", newKey1);
vm0.invoke("testTXMultiRegion: Create Region", create3);
vm0.invoke("testTXMultiRegion: Create Key", newKey3);
vm1.invoke("testTXMultiRegion: Create Region", create1);
vm1.invoke(create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm1.invoke("testTXMultiRegion: Create Key", newKey1);
vm1.invoke("testTXMultiRegion: Create Key", newKey3);
}
vm2.invoke("testTXMultiRegion: Create Region", create2);
vm2.invoke("testTXMultiRegion: Create Key", newKey2);
vm3.invoke("testTXMultiRegion: Create Region", create2);
vm3.invoke("testTXMultiRegion: Create Region", create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm3.invoke("testTXMultiRegion: Create Key", newKey2);
vm3.invoke("testTXMultiRegion: Create Key", newKey3);
}
rgn1 = createRegion(rgnName1);
rgn2 = createRegion(rgnName2);
rgn3 = createRegion(rgnName3);
cmtMsgs = dmStats.getSentCommitMessages();
commitWaits = dmStats.getCommitWaits();
txMgr.begin();
rgn1.put("key", "value1");
rgn2.put("key", "value2");
rgn3.put("key", "value3");
// TransactionId txId = txMgr.getTransactionId();
getSystem().getLogWriter()
.info("testTXMultiRegion: vm0->R1,R3 vm1->R1,R3 vm2->R2 vm3->R2,R3");
txMgr.commit();
assertThat(dmStats.getSentCommitMessages()).isEqualTo(cmtMsgs + 3);
if (rgn1.getAttributes().getScope().isAck() || rgn2.getAttributes().getScope().isAck()
|| rgn3.getAttributes().getScope().isAck()) {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits + 1);
} else {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits);
// pause to give cmt message a chance to be processed
// [bruce] changed from 200 to 2000 for mcast testing
try {
Thread.sleep(2000);
} catch (InterruptedException chomp) {
fail("interrupted");
}
}
vm0.invoke(check1_3);
vm1.invoke("testTXMultiRegion: check", check1_3);
vm2.invoke("testTXMultiRegion: check", check2);
vm3.invoke("testTXMultiRegion: check", check2_3);
rgn1.destroyRegion();
rgn2.destroyRegion();
rgn3.destroyRegion();
// vm0->R1,R3 vm1->R1,R3 vm2->R2,R3 vm3->R2,R3
vm0.invoke("testTXMultiRegion: Create Region", create1);
vm0.invoke("testTXMultiRegion: Create Key", newKey1);
vm0.invoke("testTXMultiRegion: Create Region", create3);
vm0.invoke("testTXMultiRegion: Create Key", newKey3);
vm1.invoke("testTXMultiRegion: Create Region", create1);
vm1.invoke("testTXMultiRegion: Create Region", create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm1.invoke("testTXMultiRegion: Create Key", newKey1);
vm1.invoke("testTXMultiRegion: Create Key", newKey3);
}
vm2.invoke("testTXMultiRegion: Create Region", create2);
vm2.invoke("testTXMultiRegion: Create Key", newKey2);
vm2.invoke("testTXMultiRegion: Create Region", create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm2.invoke("testTXMultiRegion: Create Key", newKey3);
}
vm3.invoke("testTXMultiRegion: Create Region", create2);
vm3.invoke("testTXMultiRegion: Create Region", create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm3.invoke("testTXMultiRegion: Create Key", newKey2);
vm3.invoke("testTXMultiRegion: Create Key", newKey3);
}
rgn1 = createRegion(rgnName1);
rgn2 = createRegion(rgnName2);
rgn3 = createRegion(rgnName3);
cmtMsgs = dmStats.getSentCommitMessages();
commitWaits = dmStats.getCommitWaits();
txMgr.begin();
rgn1.put("key", "value1");
rgn2.put("key", "value2");
rgn3.put("key", "value3");
getSystem().getLogWriter()
.info("testTXMultiRegion: vm0->R1,R3 vm1->R1,R3 vm2->R2,R3 vm3->R2,R3");
txMgr.commit();
assertThat(dmStats.getSentCommitMessages()).isEqualTo(cmtMsgs + 2);
if (rgn1.getAttributes().getScope().isAck() || rgn2.getAttributes().getScope().isAck()
|| rgn3.getAttributes().getScope().isAck()) {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits + 1);
} else {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits);
// pause to give cmt message a chance to be processed
try {
Thread.sleep(200);
} catch (InterruptedException chomp) {
fail("interrupted");
}
}
vm0.invoke("testTXMultiRegion: check", check1_3);
vm1.invoke("testTXMultiRegion: check", check1_3);
vm2.invoke("testTXMultiRegion: check", check2_3);
vm3.invoke("testTXMultiRegion: check", check2_3);
rgn1.destroyRegion();
rgn2.destroyRegion();
rgn3.destroyRegion();
// vm0->R1,R3 vm1->R1,R3 vm2->R2 vm3->R1,R3
vm0.invoke("testTXMultiRegion: Create Region", create1);
vm0.invoke("testTXMultiRegion: Create Key", newKey1);
vm0.invoke(create3);
vm0.invoke("testTXMultiRegion: Create Key", newKey3);
vm1.invoke("testTXMultiRegion: Create Region", create1);
vm1.invoke(create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm1.invoke("testTXMultiRegion: Create Key", newKey1);
vm1.invoke("testTXMultiRegion: Create Key", newKey3);
}
vm2.invoke("testTXMultiRegion: Create Region", create2);
vm2.invoke("testTXMultiRegion: Create Key", newKey2);
vm3.invoke("testTXMultiRegion: Create Region", create1);
vm3.invoke(create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm3.invoke("testTXMultiRegion: Create Key", newKey1);
vm3.invoke("testTXMultiRegion: Create Key", newKey3);
}
rgn1 = createRegion(rgnName1);
rgn2 = createRegion(rgnName2);
rgn3 = createRegion(rgnName3);
cmtMsgs = dmStats.getSentCommitMessages();
commitWaits = dmStats.getCommitWaits();
txMgr.begin();
rgn1.put("key", "value1");
rgn2.put("key", "value2");
rgn3.put("key", "value3");
getSystem().getLogWriter()
.info("testTXMultiRegion: vm0->R1,R3 vm1->R1,R3 vm2->R2 vm3->R1,R3");
txMgr.commit();
assertThat(dmStats.getSentCommitMessages()).isEqualTo(cmtMsgs + 2);
if (rgn1.getAttributes().getScope().isAck() || rgn2.getAttributes().getScope().isAck()
|| rgn3.getAttributes().getScope().isAck()) {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits + 1);
} else {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits);
// pause to give cmt message a chance to be processed
try {
Thread.sleep(200);
} catch (InterruptedException chomp) {
fail("interrupted");
}
}
vm0.invoke("testTXMultiRegion: check", check1_3);
vm1.invoke("testTXMultiRegion: check", check1_3);
vm2.invoke("testTXMultiRegion: check", check2);
vm3.invoke("testTXMultiRegion: check", check1_3);
rgn1.destroyRegion();
rgn2.destroyRegion();
rgn3.destroyRegion();
// vm0->R1 vm1->R1 vm2->R2 vm3->R3
vm0.invoke("testTXMultiRegion: Create Region", create1);
vm0.invoke("testTXMultiRegion: Create Key", newKey1);
vm1.invoke("testTXMultiRegion: Create Region", create1);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm1.invoke("testTXMultiRegion: Create Key", newKey1);
}
vm2.invoke("testTXMultiRegion: Create Region", create2);
vm2.invoke("testTXMultiRegion: Create Key", newKey2);
vm3.invoke("testTXMultiRegion: Create Region", create3);
vm3.invoke("testTXMultiRegion: Create Key", newKey3);
rgn1 = createRegion(rgnName1);
rgn2 = createRegion(rgnName2);
rgn3 = createRegion(rgnName3);
cmtMsgs = dmStats.getSentCommitMessages();
commitWaits = dmStats.getCommitWaits();
txMgr.begin();
rgn1.put("key", "value1");
rgn2.put("key", "value2");
rgn3.put("key", "value3");
getSystem().getLogWriter().info("testTXMultiRegion: vm0->R1 vm1->R1 vm2->R2 vm3->R3");
txMgr.commit();
assertThat(dmStats.getSentCommitMessages()).isEqualTo(cmtMsgs + 3);
if (rgn1.getAttributes().getScope().isAck() || rgn2.getAttributes().getScope().isAck()
|| rgn3.getAttributes().getScope().isAck()) {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits + 1);
} else {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits);
// pause to give cmt message a chance to be processed
try {
Thread.sleep(200);
} catch (InterruptedException chomp) {
fail("interrupted");
}
}
vm0.invoke(check1);
vm1.invoke(check1);
vm2.invoke("testTXMultiRegion: check", check2);
vm3.invoke(check3);
rgn1.destroyRegion();
rgn2.destroyRegion();
rgn3.destroyRegion();
// vm0->R1,R3 vm1->R2,R3 vm2->R2 vm3->R3
vm0.invoke("testTXMultiRegion: Create Region", create1);
vm0.invoke("testTXMultiRegion: Create Key", newKey1);
vm0.invoke("testTXMultiRegion: Create Region", create3);
vm0.invoke("testTXMultiRegion: Create Key", newKey3);
vm1.invoke("testTXMultiRegion: Create Region", create2);
vm1.invoke("testTXMultiRegion: Create Key", newKey2);
vm1.invoke("testTXMultiRegion: Create Region", create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm1.invoke("testTXMultiRegion: Create Key", newKey3);
}
vm2.invoke("testTXMultiRegion: Create Region", create2);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm2.invoke("testTXMultiRegion: Create Key", newKey2);
}
vm3.invoke("testTXMultiRegion: Create Region", create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm3.invoke("testTXMultiRegion: Create Key", newKey3);
}
rgn1 = createRegion(rgnName1);
rgn2 = createRegion(rgnName2);
rgn3 = createRegion(rgnName3);
cmtMsgs = dmStats.getSentCommitMessages();
commitWaits = dmStats.getCommitWaits();
txMgr.begin();
rgn1.put("key", "value1");
rgn2.put("key", "value2");
rgn3.put("key", "value3");
getSystem().getLogWriter().info("testTXMultiRegion: vm0->R1,R3 vm1->R2,R3 vm2->R2 vm3->R3");
txMgr.commit();
assertThat(dmStats.getSentCommitMessages()).isEqualTo(cmtMsgs + 4);
if (rgn1.getAttributes().getScope().isAck() || rgn2.getAttributes().getScope().isAck()
|| rgn3.getAttributes().getScope().isAck()) {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits + 1);
} else {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits);
// pause to give cmt message a chance to be processed
try {
Thread.sleep(200);
} catch (InterruptedException chomp) {
fail("interrupted");
}
}
vm0.invoke("testTXMultiRegion: check", check1_3);
vm1.invoke("testTXMultiRegion: check", check2_3);
vm2.invoke("testTXMultiRegion: check", check2);
vm3.invoke(check3);
rgn1.destroyRegion();
rgn2.destroyRegion();
rgn3.destroyRegion();
// vm0->R1,R3 vm1->R1,R3 vm2->R1,R3 vm3->R1,R3
vm0.invoke("testTXMultiRegion: Create Region", create1);
vm0.invoke("testTXMultiRegion: Create Key", newKey1);
vm0.invoke("testTXMultiRegion: Create Region", create3);
vm0.invoke("testTXMultiRegion: Create Key", newKey3);
vm1.invoke("testTXMultiRegion: Create Region", create1);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm1.invoke("testTXMultiRegion: Create Key", newKey1);
}
vm1.invoke("testTXMultiRegion: Create Region", create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm1.invoke("testTXMultiRegion: Create Key", newKey3);
}
vm2.invoke("testTXMultiRegion: Create Region", create1);
vm2.invoke("testTXMultiRegion: Create Region", create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm2.invoke("testTXMultiRegion: Create Key", newKey1);
vm2.invoke("testTXMultiRegion: Create Key", newKey3);
}
vm3.invoke("testTXMultiRegion: Create Region", create1);
vm3.invoke("testTXMultiRegion: Create Region", create3);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm3.invoke("testTXMultiRegion: Create Key", newKey1);
vm3.invoke("testTXMultiRegion: Create Key", newKey3);
}
rgn1 = createRegion(rgnName1);
rgn2 = createRegion(rgnName2);
rgn3 = createRegion(rgnName3);
cmtMsgs = dmStats.getSentCommitMessages();
commitWaits = dmStats.getCommitWaits();
txMgr.begin();
rgn1.put("key", "value1");
rgn2.put("key", "value2");
rgn3.put("key", "value3");
getSystem().getLogWriter()
.info("testTXMultiRegion: vm0->R1,R3 vm1->R1,R3 vm2->R1,R3 vm3->R1,R3");
txMgr.commit();
assertThat(dmStats.getSentCommitMessages()).isEqualTo(cmtMsgs + 1);
if (rgn1.getAttributes().getScope().isAck() || rgn2.getAttributes().getScope().isAck()
|| rgn3.getAttributes().getScope().isAck()) {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits + 1);
} else {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits);
// pause to give cmt message a chance to be processed
try {
Thread.sleep(200);
} catch (InterruptedException chomp) {
fail("interrupted");
}
}
vm0.invoke("testTXMultiRegion: check", check1_3);
vm1.invoke("testTXMultiRegion: check", check1_3);
vm2.invoke("testTXMultiRegion: check", check1_3);
vm3.invoke("testTXMultiRegion: check", check1_3);
rgn1.destroyRegion();
rgn2.destroyRegion();
rgn3.destroyRegion();
} catch (Exception e) {
getCache().close();
getSystem().getLogWriter().fine("testTXMultiRegion: Caused exception in createRegion");
throw e;
}
}