in geode-dunit/src/main/java/org/apache/geode/cache30/MultiVMRegionTestCase.java [3427:3692]
public void testTXNonblockingGetInitialImage() {
assumeThat(supportsReplication()).isTrue();
assumeThat(supportsTransactions()).isTrue();
// don't run this test if global scope since its too difficult to predict
// how many concurrent operations will occur
assumeThat(getRegionAttributes().getScope().isGlobal()).isFalse();
assumeThat(getRegionAttributes().getDataPolicy().withPersistence()).isFalse();
final String name = getUniqueName();
final byte[][] values = new byte[NB1_NUM_ENTRIES][];
for (int i = 0; i < NB1_NUM_ENTRIES; i++) {
values[i] = new byte[NB1_VALUE_SIZE];
Arrays.fill(values[i], (byte) 0x42);
}
SerializableRunnable create = new CacheSerializableRunnable("Create Mirrored Region") {
@Override
public void run2() throws CacheException {
beginCacheXml();
{ // root region must be DACK because its used to sync up async subregions
AttributesFactory<?, ?> factory = new AttributesFactory<>();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.NORMAL);
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
createRootRegion(factory.create());
}
{
AttributesFactory<?, ?> factory = new AttributesFactory<>(getRegionAttributes());
if (getRegionAttributes().getDataPolicy() == DataPolicy.NORMAL) {
factory.setDataPolicy(DataPolicy.PRELOADED);
}
factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
createRegion(name, factory.create());
}
finishCacheXml(name);
// reset slow
org.apache.geode.internal.cache.InitialImageOperation.slowImageProcessing = 0;
}
};
vm0.invoke("Create Nonmirrored Region", new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
{ // root region must be DACK because its used to sync up async subregions
AttributesFactory<?, ?> factory = new AttributesFactory<>();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.EMPTY);
createRootRegion(factory.create());
}
{
AttributesFactory<?, ?> factory = new AttributesFactory<>(getRegionAttributes());
createRegion(name, factory.create());
}
// reset slow
org.apache.geode.internal.cache.InitialImageOperation.slowImageProcessing = 0;
}
});
vm0.invoke("Put initial data", new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = getRootRegion().getSubregion(name);
for (int i = 0; i < NB1_NUM_ENTRIES; i++) {
region.put(i, values[i]);
}
assertThat(region.keySet().size()).isEqualTo(NB1_NUM_ENTRIES);
}
});
// start asynchronous process that does updates to the data
AsyncInvocation<Void> async =
vm0.invokeAsync("Do Nonblocking Operations", new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// wait for profile of getInitialImage cache to show up
final org.apache.geode.internal.cache.CacheDistributionAdvisor adv =
((org.apache.geode.internal.cache.DistributedRegion) region)
.getCacheDistributionAdvisor();
final int expectedProfiles = 1;
await("replicate count never reached " + expectedProfiles)
.until(() -> {
DataPolicy currentPolicy = getRegionAttributes().getDataPolicy();
if (currentPolicy == DataPolicy.PRELOADED) {
return (adv.advisePreloadeds().size()
+ adv.adviseReplicates().size()) >= expectedProfiles;
} else {
return adv.adviseReplicates().size() >= expectedProfiles;
}
});
// operate on every odd entry with different value, alternating between
// updates, invalidates, and destroys. These operations are likely
// to be nonblocking if a sufficient number of updates get through
// before the get initial image is complete.
CacheTransactionManager txMgr = getCache().getCacheTransactionManager();
for (int i = 1; i < NB1_NUM_ENTRIES; i += 2) {
Object key = i;
switch (i % 6) {
case 1: // UPDATE
// use the current timestamp so we know when it happened
// we could have used last modification timestamps, but
// this works without enabling statistics
Object value = System.currentTimeMillis();
txMgr.begin();
region.put(key, value);
txMgr.commit();
// no longer safe since get is not allowed to member doing GII
// if (getRegionAttributes().getScope().isDistributedAck()) {
// // do a nonblocking netSearch
// region.localInvalidate(key);
// assertIndexDetailsEquals(value, region.get(key));
// }
break;
case 3: // INVALIDATE
txMgr.begin();
region.invalidate(key);
txMgr.commit();
if (getRegionAttributes().getScope().isDistributedAck()) {
// do a nonblocking netSearch
assertThat(region.get(key)).isNull();
}
break;
case 5: // DESTROY
txMgr.begin();
region.destroy(key);
txMgr.commit();
if (getRegionAttributes().getScope().isDistributedAck()) {
// do a nonblocking netSearch
assertThat(region.get(key)).isNull();
}
break;
default:
fail("unexpected modulus result: " + i);
break;
}
}
// add some new keys
for (int i = NB1_NUM_ENTRIES; i < NB1_NUM_ENTRIES + 200; i++) {
txMgr.begin();
region.create(i, System.currentTimeMillis());
txMgr.commit();
}
// now do a put and our DACK root region which will not complete
// until processed on otherside which means everything done before this
// point has been processed
getRootRegion().put("DONE", "FLUSH_OPS");
}
});
// in the meantime, do the get initial image in vm2
// slow down image processing to make it more likely to get async updates
if (!getRegionAttributes().getScope().isGlobal()) {
vm2.invoke("Set slow image processing", new SerializableRunnable() {
@Override
public void run() {
// if this is a no_ack test, then we need to slow down more because of the
// pauses in the nonblocking operations
org.apache.geode.internal.cache.InitialImageOperation.slowImageProcessing = 200;
}
});
}
AsyncInvocation<Void> asyncGII = vm2.invokeAsync(create);
if (!getRegionAttributes().getScope().isGlobal()) {
// wait for nonblocking operations to complete
ThreadUtils.join(async, 30 * 1000);
vm2.invoke("Set fast image processing", new SerializableRunnable() {
@Override
public void run() {
org.apache.geode.internal.cache.InitialImageOperation.slowImageProcessing = 0;
}
});
logger.info("after async nonblocking ops complete");
}
// wait for GII to complete
ThreadUtils.join(asyncGII, 30 * 1000);
final long iiComplete = System.currentTimeMillis();
logger.info("Complete GetInitialImage at: " + System.currentTimeMillis());
if (getRegionAttributes().getScope().isGlobal()) {
// wait for nonblocking operations to complete
ThreadUtils.join(async, 30 * 1000);
}
if (async.exceptionOccurred()) {
fail("async failed", async.getException());
}
if (asyncGII.exceptionOccurred()) {
fail("asyncGII failed", asyncGII.getException());
}
// Locally destroy the region in vm0 so we know that they are not found by
// a netSearch
vm0.invoke("Locally destroy region", new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = getRootRegion().getSubregion(name);
region.localDestroyRegion();
}
});
// invoke repeating so noack regions wait for all updates to get processed
vm2.invoke(repeatingIfNecessary(5000,
new CacheSerializableRunnable("Verify entryCount") {
boolean entriesDumped = false;
@Override
public void run2() throws CacheException {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// expected entry count (subtract entries destroyed)
int entryCount = NB1_NUM_ENTRIES + 200 - NB1_NUM_ENTRIES / 6;
int actualCount = region.entrySet(false).size();
if (actualCount == NB1_NUM_ENTRIES + 200) {
// entries not destroyed, dump entries that were supposed to have been destroyed
dumpDestroyedEntries(region);
}
assertThat(actualCount).isEqualTo(entryCount);
}
private void dumpDestroyedEntries(Region region) throws EntryNotFoundException {
if (entriesDumped) {
return;
}
entriesDumped = true;
logger.info("DUMPING Entries with values in VM that should have been destroyed:");
for (int i = 5; i < NB1_NUM_ENTRIES; i += 6) {
logger.info(i + "-->" + ((org.apache.geode.internal.cache.LocalRegion) region)
.getValueInVM(i));
}
}
}));
vm2.invoke("Verify keys/values & Nonblocking", new CacheSerializableRunnable() {
@Override
public void run2() throws CacheException {
Region<Object, Object> region = getRootRegion().getSubregion(name);
// expected entry count (subtract entries destroyed)
int entryCount = NB1_NUM_ENTRIES + 200 - NB1_NUM_ENTRIES / 6;
assertThat(region.entrySet(false).size()).isEqualTo(entryCount);
// determine how many entries were updated before getInitialImage
// was complete
int numConcurrent = 0;
numConcurrent = verifyEntryUpdateCounts(region, numConcurrent, values, iiComplete);
logger.info(name + ": " + numConcurrent
+ " entries out of " + entryCount + " were updated concurrently with getInitialImage");
// make sure at least some of them were concurrent
{
int min = 30;
String description =
"Not enough updates concurrent with getInitialImage occurred to my liking. "
+ numConcurrent + " entries out of " + entryCount
+ " were updated concurrently with getInitialImage, and I'd expect at least "
+ min
+ " or so";
assertThat(numConcurrent >= min).describedAs(description).isTrue();
}
}
});
}