in geode-dunit/src/main/java/org/apache/geode/cache30/MultiVMRegionTestCase.java [1607:1925]
public void testRemoteCacheWriter() {
assertThat(getRegionAttributes().getScope().isDistributed()).isTrue();
final String name = getUniqueName();
final Object key = "KEY";
final Object oldValue = "OLD_VALUE";
final Object newValue = "NEW_VALUE";
final Object arg = "ARG";
final Object exception = "EXCEPTION";
final Object key2 = "KEY2";
final Object value2 = "VALUE2";
SerializableRunnable create = new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = createRegion(name);
// Put key2 in the region before any callbacks are
// registered, so it can be destroyed later
region.put(key2, value2);
assertThat(region.size()).isEqualTo(1);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
LocalRegion reRegion;
reRegion = (LocalRegion) region;
RegionEntry re = reRegion.getRegionEntry(key2);
StoredObject so = (StoredObject) re.getValue();
assertThat(so.getRefCount()).isEqualTo(1);
assertThat(ma.getStats().getObjects()).isEqualTo(1);
}
}
};
vm0.invoke("Create Region", create);
vm1.invoke("Create Region", create);
//////// Create
vm1.invoke("Set Writer", () -> {
final Region<Object, Object> region = getRootRegion().getSubregion(name);
setWriter(new TestCacheWriter<Object, Object>() {
@Override
public void beforeCreate2(EntryEvent<Object, Object> event) throws CacheWriterException {
if (exception.equals(event.getCallbackArgument())) {
String s = "Test Exception";
throw new CacheWriterException(s);
}
assertThat(event.getRegion()).isEqualTo(region);
assertThat(event.getOperation().isCreate()).isTrue();
assertThat(event.getOperation().isDistributed()).isTrue();
assertThat(event.getOperation().isExpiration()).isFalse();
assertThat(event.isOriginRemote()).isTrue();
assertThat(event.getKey()).isEqualTo(key);
assertThat(event.getOldValue()).isNull();
assertThat(event.getNewValue()).isEqualTo(oldValue);
assertThat(event.getOperation().isLoad()).isFalse();
assertThat(event.getOperation().isLocalLoad()).isFalse();
assertThat(event.getOperation().isNetLoad()).isFalse();
assertThat(event.getOperation().isNetSearch()).isFalse();
}
});
region.getAttributesMutator().setCacheWriter(writer());
flushIfNecessary(region);
});
vm0.invoke("Create with Exception", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
try {
region.put(key, oldValue, exception);
fail("Should have thrown a CacheWriterException");
} catch (CacheWriterException ex) {
assertThat(region.getEntry(key)).isNull();
assertThat(region.size()).isEqualTo(1);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
assertThat(ma.getStats().getObjects()).isEqualTo(1);
}
}
});
vm1.invoke("Verify callback", () -> {
assertThat(writer().wasInvoked()).isTrue();
});
vm0.invoke("Create with Argument", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put(key, oldValue, arg);
assertThat(region.size()).isEqualTo(2);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
assertThat(ma.getStats().getObjects()).isEqualTo(2);
LocalRegion reRegion;
reRegion = (LocalRegion) region;
StoredObject so = (StoredObject) reRegion.getRegionEntry(key).getValue();
assertThat(so.getRefCount()).isEqualTo(1);
}
});
vm1.invoke("Verify callback", () -> {
assertThat(writer().wasInvoked()).isTrue();
});
//////// Update
vm1.invoke("Set Writer", () -> {
final Region<Object, Object> region = getRootRegion().getSubregion(name);
setWriter(new TestCacheWriter<Object, Object>() {
@Override
public void beforeUpdate2(EntryEvent<Object, Object> event) throws CacheWriterException {
Object argument = event.getCallbackArgument();
if (exception.equals(argument)) {
String s = "Test Exception";
throw new CacheWriterException(s);
}
assertThat(argument).isEqualTo(arg);
assertThat(event.getRegion()).isEqualTo(region);
assertThat(event.getOperation().isUpdate()).isTrue();
assertThat(event.getOperation().isDistributed()).isTrue();
assertThat(event.getOperation().isExpiration()).isFalse();
assertThat(event.isOriginRemote()).isTrue();
assertThat(event.getKey()).isEqualTo(key);
assertThat(event.getOldValue()).isEqualTo(oldValue);
assertThat(event.getNewValue()).isEqualTo(newValue);
assertThat(event.getOperation().isLoad()).isFalse();
assertThat(event.getOperation().isLocalLoad()).isFalse();
assertThat(event.getOperation().isNetLoad()).isFalse();
assertThat(event.getOperation().isNetSearch()).isFalse();
}
});
region.getAttributesMutator().setCacheWriter(writer());
});
vm0.invoke("Update with Exception", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
try {
region.put(key, newValue, exception);
fail("Should have thrown a CacheWriterException");
} catch (CacheWriterException ex) {
Region.Entry entry = region.getEntry(key);
assertThat(entry.getValue()).isEqualTo(oldValue);
assertThat(region.size()).isEqualTo(2);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
assertThat(ma.getStats().getObjects()).isEqualTo(2);
LocalRegion reRegion;
reRegion = (LocalRegion) region;
StoredObject so = (StoredObject) reRegion.getRegionEntry(key).getValue();
assertThat(so.getRefCount()).isEqualTo(1);
}
}
});
vm1.invoke("Verify callback", () -> {
assertThat(writer().wasInvoked()).isTrue();
});
vm0.invoke("Update with Argument", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.put(key, newValue, arg);
assertThat(region.size()).isEqualTo(2);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
assertThat(ma.getStats().getObjects()).isEqualTo(2);
}
});
vm1.invoke("Verify callback", () -> {
assertThat(writer().wasInvoked()).isTrue();
});
//////// Destroy
vm1.invoke("Set Writer", () -> {
final Region<Object, Object> region = getRootRegion().getSubregion(name);
setWriter(new TestCacheWriter<Object, Object>() {
@Override
public void beforeDestroy2(EntryEvent<Object, Object> event) throws CacheWriterException {
Object argument = event.getCallbackArgument();
if (exception.equals(argument)) {
String s = "Test Exception";
throw new CacheWriterException(s);
}
assertThat(argument).isEqualTo(arg);
assertThat(event.getRegion()).isEqualTo(region);
assertThat(event.getOperation().isDestroy()).isTrue();
assertThat(event.getOperation().isDistributed()).isTrue();
assertThat(event.getOperation().isExpiration()).isFalse();
assertThat(event.isOriginRemote()).isTrue();
assertThat(event.getKey()).isEqualTo(key);
assertThat(event.getOldValue()).isEqualTo(newValue);
assertThat(event.getNewValue()).isNull();
assertThat(event.getOperation().isLoad()).isFalse();
assertThat(event.getOperation().isLocalLoad()).isFalse();
assertThat(event.getOperation().isNetLoad()).isFalse();
assertThat(event.getOperation().isNetSearch()).isFalse();
}
});
region.getAttributesMutator().setCacheWriter(writer());
});
vm0.invoke("Destroy with Exception", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
try {
region.destroy(key, exception);
fail("Should have thrown a CacheWriterException");
} catch (CacheWriterException ex) {
assertThat(region.getEntry(key)).isNotNull();
assertThat(region.size()).isEqualTo(2);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
assertThat(ma.getStats().getObjects()).isEqualTo(2);
}
}
});
vm1.invoke("Verify callback", () -> {
assertThat(writer().wasInvoked()).isTrue();
});
vm0.invoke("Destroy with Argument", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.destroy(key, arg);
assertThat(region.size()).isEqualTo(1);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
assertThat(ma.getStats().getObjects()).isEqualTo(1);
}
});
vm1.invoke("Verify callback", () -> {
assertThat(writer().wasInvoked()).isTrue();
});
//////// Region Destroy
vm1.invoke("Set Writer", () -> {
final Region<Object, Object> region = getRootRegion().getSubregion(name);
setWriter(new TestCacheWriter<Object, Object>() {
@Override
public void beforeRegionDestroy2(RegionEvent<Object, Object> event)
throws CacheWriterException {
Object argument = event.getCallbackArgument();
if (exception.equals(argument)) {
String s = "Test Exception";
throw new CacheWriterException(s);
}
assertThat(argument).isEqualTo(arg);
assertThat(event.getRegion()).isEqualTo(region);
assertThat(event.getOperation().isRegionDestroy()).isTrue();
assertThat(event.getOperation().isDistributed()).isTrue();
assertThat(event.getOperation().isExpiration()).isFalse();
assertThat(event.isOriginRemote()).isTrue();
}
});
region.getAttributesMutator().setCacheWriter(writer());
});
vm0.invoke("Destroy with Exception", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
try {
region.destroyRegion(exception);
fail("Should have thrown a CacheWriterException");
} catch (CacheWriterException ex) {
if (region.isDestroyed()) {
fail("should not have an exception if region is destroyed", ex);
}
assertThat(region.size()).isEqualTo(1);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
assertThat(ma.getStats().getObjects()).isEqualTo(1);
}
}
});
vm1.invoke("Verify callback", () -> {
assertThat(writer().wasInvoked()).isTrue();
});
vm0.invoke("Destroy with Argument", () -> {
Region<Object, Object> region = getRootRegion().getSubregion(name);
assertThat(region.size()).isEqualTo(1);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
assertThat(ma.getStats().getObjects()).isEqualTo(1);
}
region.destroyRegion(arg);
if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
final MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
await("waiting for off-heap object count go to zero")
.until(() -> ma.getStats().getObjects(), equalTo(0));
}
});
vm1.invoke("Verify callback", () -> {
assertThat(writer().wasInvoked()).isTrue();
});
}