in geode-dunit/src/main/java/org/apache/geode/cache30/MultiVMRegionTestCase.java [4355:4665]
public void testTXSimpleOps() {
assumeThat(supportsTransactions()).isTrue();
assertThat(getRegionAttributes().getScope().isDistributed()).isTrue();
CacheTransactionManager txMgr = getCache().getCacheTransactionManager();
if (getRegionAttributes().getScope().isGlobal()
|| getRegionAttributes().getDataPolicy().withPersistence()) {
// just make sure transactions are not allowed on global or shared regions
Region<String, String> rgn = createRegion(getUniqueName());
txMgr.begin();
assertThatThrownBy(() -> rgn.put("testTXSimpleOpsKey1", "val"))
.isInstanceOf(UnsupportedOperationException.class);
txMgr.rollback();
rgn.localDestroyRegion();
return;
}
final String rgnName = getUniqueName();
SerializableRunnable create = new SerializableRunnable() {
@Override
public void run() {
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = new MyTransactionListener();
setTxListener(txMgr2, tl);
assertThat(tl.lastEvent).isNull();
assertThat(tl.afterCommitCount).isEqualTo(0);
assertThat(tl.afterFailedCommitCount).isEqualTo(0);
assertThat(tl.afterRollbackCount).isEqualTo(0);
assertThat(tl.closeCount).isEqualTo(0);
try {
Region<Object, Object> rgn = createRegion(rgnName);
CountingDistCacheListener<Object, Object> cacheListener =
new CountingDistCacheListener<>();
rgn.getAttributesMutator().addCacheListener(cacheListener);
cacheListener.assertCount(0, 0, 0, 0);
getSystem().getLogWriter().info("testTXSimpleOps: Created region");
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
SerializableRunnable newKey =
new SerializableRunnable() {
@Override
public void run() {
try {
Region<?, ?> root = getRootRegion();
Region<Object, Object> rgn = root.getSubregion(rgnName);
rgn.create("key", null);
CountingDistCacheListener cacheListener =
(CountingDistCacheListener) rgn.getAttributes().getCacheListeners()[0];
cacheListener.assertCount(0, 0, 0, 0);
getSystem().getLogWriter().info("testTXSimpleOps: Created Key");
} catch (CacheException e) {
fail("While creating region", e);
}
}
};
VM vm = VM.getVM(0);
vm.invoke("testTXSimpleOps: Create Region", create);
vm.invoke("testTXSimpleOps: Create Region & Create Key", newKey);
int vmCount = VM.getVMCount();
for (int i = 1; i < vmCount; i++) {
vm = VM.getVM(i);
vm.invoke("testTXSimpleOps: Create Region", create);
if (!getRegionAttributes().getDataPolicy().withReplication()
&& !getRegionAttributes().getDataPolicy().withPreloaded()) {
vm.invoke("testTXSimpleOps: Create Region & Create Key", newKey);
}
}
try {
Region<String, String> rgn = createRegion(rgnName);
DMStats dmStats = getSystem().getDistributionManager().getStats();
long cmtMsgs = dmStats.getSentCommitMessages();
long commitWaits = dmStats.getCommitWaits();
{
txMgr.begin();
rgn.put("key", "value");
final TransactionId txId = txMgr.getTransactionId();
txMgr.commit();
assertThat(dmStats.getSentCommitMessages()).isEqualTo(cmtMsgs + 1);
if (rgn.getAttributes().getScope().isAck()) {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits + 1);
} else {
assertThat(dmStats.getCommitWaits()).isEqualTo(commitWaits);
}
getSystem().getLogWriter().info("testTXSimpleOps: Create/Put Value");
invokeInEveryVM(new SerializableRunnable() {
@Override
public void run() {
assertCacheCallbackEvents(rgnName, txId, null, "value");
}
});
invokeInEveryVM("testTXSimpleOps: Verify Received Value", repeatingIfNecessary(
new CacheSerializableRunnable() {
@Override
public void run2() {
Region<String, String> rgn1 = getRootRegion().getSubregion(rgnName);
assertThat(rgn1.getEntry("key")).describedAs("Could not find entry for 'key'")
.isNotNull();
assertThat(rgn1.getEntry("key").getValue()).isEqualTo("value");
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl =
firstTransactionListenerFrom(txMgr2);
tl.checkAfterCommitCount(1);
assertThat(tl.afterFailedCommitCount).isEqualTo(0);
assertThat(tl.afterRollbackCount).isEqualTo(0);
assertThat(tl.closeCount).isEqualTo(0);
assertThat(tl.lastEvent.getCache()).isEqualTo(rgn1.getRegionService());
{
Collection<EntryEvent<?, ?>> events;
RegionAttributes<String, String> attr = getRegionAttributes();
if (!attr.getDataPolicy().withReplication()
|| attr.getConcurrencyChecksEnabled()) {
events = TxEventTestUtil.getPutEvents(tl.lastEvent.getEvents());
} else {
events = TxEventTestUtil.getCreateEvents(tl.lastEvent.getEvents());
}
assertThat(events.size()).isEqualTo(1);
EntryEvent ev = events.iterator().next();
assertThat(ev.getTransactionId()).isEqualTo(tl.expectedTxId);
assertThat(rgn1).isSameAs(ev.getRegion());
assertThat(ev.getKey()).isEqualTo("key");
assertThat(ev.getNewValue()).isEqualTo("value");
assertThat(ev.getOldValue()).isNull();
assertThat(ev.getOperation().isLocalLoad()).isFalse();
assertThat(ev.getOperation().isNetLoad()).isFalse();
assertThat(ev.getOperation().isLoad()).isFalse();
assertThat(ev.getOperation().isNetSearch()).isFalse();
assertThat(ev.getOperation().isExpiration()).isFalse();
assertThat(ev.getCallbackArgument()).isNull();
assertThat(ev.isCallbackArgumentAvailable()).isTrue();
assertThat(ev.isOriginRemote()).isTrue();
assertThat(ev.getOperation().isDistributed()).isTrue();
}
CountingDistCacheListener<String, String> cdcL =
(CountingDistCacheListener<String, String>) rgn1.getAttributes()
.getCacheListeners()[0];
cdcL.assertCount(0, 1, 0, 0);
}
}));
}
{
txMgr.begin();
rgn.put("key", "value2");
final TransactionId txId = txMgr.getTransactionId();
txMgr.commit();
getSystem().getLogWriter().info("testTXSimpleOps: Put(update) Value2");
invokeInEveryVM(new SerializableRunnable() {
@Override
public void run() {
assertCacheCallbackEvents(rgnName, txId, "value", "value2");
}
});
invokeInEveryVM("testTXSimpleOps: Verify Received Value", repeatingIfNecessary(
new CacheSerializableRunnable() {
@Override
public void run2() {
Region<String, String> rgn1 = getRootRegion().getSubregion(rgnName);
assertThat(rgn1.getEntry("key")).describedAs("Could not find entry for 'key'")
.isNotNull();
assertThat(rgn1.getEntry("key").getValue()).isEqualTo("value2");
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = firstTransactionListenerFrom(txMgr2);
tl.checkAfterCommitCount(2);
assertThat(tl.lastEvent.getCache()).isEqualTo(rgn1.getRegionService());
{
Collection<EntryEvent<?, ?>> events =
TxEventTestUtil.getPutEvents(tl.lastEvent.getEvents());
assertThat(events.size()).isEqualTo(1);
EntryEvent<?, ?> ev = events.iterator().next();
assertThat(ev.getTransactionId()).isEqualTo(tl.expectedTxId);
assertThat(rgn1).isSameAs(ev.getRegion());
assertThat(ev.getKey()).isEqualTo("key");
assertThat(ev.getNewValue()).isEqualTo("value2");
assertThat(ev.getOldValue()).isEqualTo("value");
assertThat(ev.getOperation().isLocalLoad()).isFalse();
assertThat(ev.getOperation().isNetLoad()).isFalse();
assertThat(ev.getOperation().isLoad()).isFalse();
assertThat(ev.getOperation().isNetSearch()).isFalse();
assertThat(ev.getOperation().isExpiration()).isFalse();
assertThat(ev.getCallbackArgument()).isNull();
assertThat(ev.isCallbackArgumentAvailable()).isTrue();
assertThat(ev.isOriginRemote()).isTrue();
assertThat(ev.getOperation().isDistributed()).isTrue();
}
CountingDistCacheListener cdcL =
(CountingDistCacheListener) rgn1.getAttributes().getCacheListeners()[0];
cdcL.assertCount(0, 2, 0, 0);
}
}));
}
{
txMgr.begin();
rgn.invalidate("key");
final TransactionId txId = txMgr.getTransactionId();
txMgr.commit();
getSystem().getLogWriter().info("testTXSimpleOps: invalidate key");
// validate each of the CacheListeners EntryEvents
invokeInEveryVM(new SerializableRunnable() {
@Override
public void run() {
assertCacheCallbackEvents(rgnName, txId, "value2", null);
}
});
invokeInEveryVM("testTXSimpleOps: Verify Received Value", repeatingIfNecessary(
new CacheSerializableRunnable() {
@Override
public void run2() {
Region<String, String> rgn1 = getRootRegion().getSubregion(rgnName);
assertThat(rgn1.getEntry("key")).describedAs("entry for 'key'")
.isNotNull();
assertThat(rgn1.containsKey("key")).isTrue();
assertThat(rgn1.containsValueForKey("key")).isFalse();
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl =
firstTransactionListenerFrom(txMgr2);
tl.checkAfterCommitCount(3);
assertThat(tl.lastEvent.getCache()).isEqualTo(rgn1.getRegionService());
{
List<EntryEvent<?, ?>> events =
TxEventTestUtil.getInvalidateEvents(tl.lastEvent.getEvents());
assertThat(events.size()).isEqualTo(1);
EntryEvent<?, ?> ev = events.get(0);
assertThat(ev.getTransactionId()).isEqualTo(tl.expectedTxId);
assertThat(rgn1).isSameAs(ev.getRegion());
assertThat(ev.getKey()).isEqualTo("key");
assertThat(ev.getNewValue()).isNull();
assertThat(ev.getOldValue()).isEqualTo("value2");
assertThat(ev.getOperation().isLocalLoad()).isFalse();
assertThat(ev.getOperation().isNetLoad()).isFalse();
assertThat(ev.getOperation().isLoad()).isFalse();
assertThat(ev.getOperation().isNetSearch()).isFalse();
assertThat(ev.getOperation().isExpiration()).isFalse();
assertThat(ev.getCallbackArgument()).isNull();
assertThat(ev.isCallbackArgumentAvailable()).isTrue();
assertThat(ev.isOriginRemote()).isTrue();
assertThat(ev.getOperation().isDistributed()).isTrue();
}
CountingDistCacheListener cdcL =
(CountingDistCacheListener) rgn1.getAttributes().getCacheListeners()[0];
cdcL.assertCount(0, 2, 1, 0);
}
}));
}
{
txMgr.begin();
rgn.destroy("key");
TransactionId txId = txMgr.getTransactionId();
txMgr.commit();
getSystem().getLogWriter().info("testTXSimpleOps: destroy key");
// validate each of the CacheListeners EntryEvents
invokeInEveryVM(new SerializableRunnable() {
@Override
public void run() {
assertCacheCallbackEvents(rgnName, txId, null, null);
}
});
invokeInEveryVM("testTXSimpleOps: Verify Received Value", repeatingIfNecessary(
new CacheSerializableRunnable() {
@Override
public void run2() {
Region<String, String> rgn1 = getRootRegion().getSubregion(rgnName);
assertThat(rgn1.containsKey("key")).isFalse();
CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
MyTransactionListener tl = firstTransactionListenerFrom(txMgr2);
tl.checkAfterCommitCount(4);
assertThat(tl.lastEvent.getCache()).isEqualTo(rgn1.getRegionService());
{
Collection<EntryEvent<?, ?>> events =
TxEventTestUtil.getDestroyEvents(tl.lastEvent.getEvents());
assertThat(events.size()).isEqualTo(1);
EntryEvent<?, ?> ev = events.iterator().next();
assertThat(ev.getTransactionId()).isEqualTo(tl.expectedTxId);
assertThat(rgn1).isSameAs(ev.getRegion());
assertThat(ev.getKey()).isEqualTo("key");
assertThat(ev.getNewValue()).isNull();
assertThat(ev.getOldValue()).isNull();
assertThat(ev.getOperation().isLocalLoad()).isFalse();
assertThat(ev.getOperation().isNetLoad()).isFalse();
assertThat(ev.getOperation().isLoad()).isFalse();
assertThat(ev.getOperation().isNetSearch()).isFalse();
assertThat(ev.getOperation().isExpiration()).isFalse();
assertThat(ev.getCallbackArgument()).isNull();
assertThat(ev.isCallbackArgumentAvailable()).isTrue();
assertThat(ev.isOriginRemote()).isTrue();
assertThat(ev.getOperation().isDistributed()).isTrue();
}
CountingDistCacheListener cdcL =
(CountingDistCacheListener) rgn1.getAttributes().getCacheListeners()[0];
cdcL.assertCount(0, 2, 1, 1);
}
}));
}
} catch (Exception e) {
getCache().close();
getSystem().getLogWriter().fine("testTXSimpleOps: Caused exception in createRegion");
throw e;
}
}