public void testTXNonblockingGetInitialImage()

in geode-dunit/src/main/java/org/apache/geode/cache30/MultiVMRegionTestCase.java [3427:3692]


  public void testTXNonblockingGetInitialImage() {
    assumeThat(supportsReplication()).isTrue();
    assumeThat(supportsTransactions()).isTrue();
    // don't run this test if global scope since its too difficult to predict
    // how many concurrent operations will occur
    assumeThat(getRegionAttributes().getScope().isGlobal()).isFalse();
    assumeThat(getRegionAttributes().getDataPolicy().withPersistence()).isFalse();

    final String name = getUniqueName();
    final byte[][] values = new byte[NB1_NUM_ENTRIES][];

    for (int i = 0; i < NB1_NUM_ENTRIES; i++) {
      values[i] = new byte[NB1_VALUE_SIZE];
      Arrays.fill(values[i], (byte) 0x42);
    }

    SerializableRunnable create = new CacheSerializableRunnable("Create Mirrored Region") {
      @Override
      public void run2() throws CacheException {
        beginCacheXml();
        { // root region must be DACK because its used to sync up async subregions
          AttributesFactory<?, ?> factory = new AttributesFactory<>();
          factory.setScope(Scope.DISTRIBUTED_ACK);
          factory.setDataPolicy(DataPolicy.NORMAL);
          factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
          createRootRegion(factory.create());
        }
        {
          AttributesFactory<?, ?> factory = new AttributesFactory<>(getRegionAttributes());
          if (getRegionAttributes().getDataPolicy() == DataPolicy.NORMAL) {
            factory.setDataPolicy(DataPolicy.PRELOADED);
          }
          factory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
          createRegion(name, factory.create());
        }
        finishCacheXml(name);
        // reset slow
        org.apache.geode.internal.cache.InitialImageOperation.slowImageProcessing = 0;
      }
    };

    vm0.invoke("Create Nonmirrored Region", new CacheSerializableRunnable() {
      @Override
      public void run2() throws CacheException {
        { // root region must be DACK because its used to sync up async subregions
          AttributesFactory<?, ?> factory = new AttributesFactory<>();
          factory.setScope(Scope.DISTRIBUTED_ACK);
          factory.setDataPolicy(DataPolicy.EMPTY);
          createRootRegion(factory.create());
        }
        {
          AttributesFactory<?, ?> factory = new AttributesFactory<>(getRegionAttributes());
          createRegion(name, factory.create());
        }
        // reset slow
        org.apache.geode.internal.cache.InitialImageOperation.slowImageProcessing = 0;
      }
    });

    vm0.invoke("Put initial data", new CacheSerializableRunnable() {
      @Override
      public void run2() throws CacheException {
        Region<Object, Object> region = getRootRegion().getSubregion(name);
        for (int i = 0; i < NB1_NUM_ENTRIES; i++) {
          region.put(i, values[i]);
        }
        assertThat(region.keySet().size()).isEqualTo(NB1_NUM_ENTRIES);
      }
    });

    // start asynchronous process that does updates to the data
    AsyncInvocation<Void> async =
        vm0.invokeAsync("Do Nonblocking Operations", new CacheSerializableRunnable() {
          @Override
          public void run2() throws CacheException {
            Region<Object, Object> region = getRootRegion().getSubregion(name);

            // wait for profile of getInitialImage cache to show up
            final org.apache.geode.internal.cache.CacheDistributionAdvisor adv =
                ((org.apache.geode.internal.cache.DistributedRegion) region)
                    .getCacheDistributionAdvisor();
            final int expectedProfiles = 1;

            await("replicate count never reached " + expectedProfiles)
                .until(() -> {
                  DataPolicy currentPolicy = getRegionAttributes().getDataPolicy();
                  if (currentPolicy == DataPolicy.PRELOADED) {
                    return (adv.advisePreloadeds().size()
                        + adv.adviseReplicates().size()) >= expectedProfiles;
                  } else {
                    return adv.adviseReplicates().size() >= expectedProfiles;
                  }
                });
            // operate on every odd entry with different value, alternating between
            // updates, invalidates, and destroys. These operations are likely
            // to be nonblocking if a sufficient number of updates get through
            // before the get initial image is complete.
            CacheTransactionManager txMgr = getCache().getCacheTransactionManager();
            for (int i = 1; i < NB1_NUM_ENTRIES; i += 2) {
              Object key = i;
              switch (i % 6) {
                case 1: // UPDATE
                  // use the current timestamp so we know when it happened
                  // we could have used last modification timestamps, but
                  // this works without enabling statistics
                  Object value = System.currentTimeMillis();
                  txMgr.begin();
                  region.put(key, value);
                  txMgr.commit();
                  // no longer safe since get is not allowed to member doing GII
                  // if (getRegionAttributes().getScope().isDistributedAck()) {
                  // // do a nonblocking netSearch
                  // region.localInvalidate(key);
                  // assertIndexDetailsEquals(value, region.get(key));
                  // }
                  break;
                case 3: // INVALIDATE
                  txMgr.begin();
                  region.invalidate(key);
                  txMgr.commit();
                  if (getRegionAttributes().getScope().isDistributedAck()) {
                    // do a nonblocking netSearch
                    assertThat(region.get(key)).isNull();
                  }
                  break;
                case 5: // DESTROY
                  txMgr.begin();
                  region.destroy(key);
                  txMgr.commit();
                  if (getRegionAttributes().getScope().isDistributedAck()) {
                    // do a nonblocking netSearch
                    assertThat(region.get(key)).isNull();
                  }
                  break;
                default:
                  fail("unexpected modulus result: " + i);
                  break;
              }
            }
            // add some new keys
            for (int i = NB1_NUM_ENTRIES; i < NB1_NUM_ENTRIES + 200; i++) {
              txMgr.begin();
              region.create(i, System.currentTimeMillis());
              txMgr.commit();
            }
            // now do a put and our DACK root region which will not complete
            // until processed on otherside which means everything done before this
            // point has been processed
            getRootRegion().put("DONE", "FLUSH_OPS");
          }
        });

    // in the meantime, do the get initial image in vm2
    // slow down image processing to make it more likely to get async updates
    if (!getRegionAttributes().getScope().isGlobal()) {
      vm2.invoke("Set slow image processing", new SerializableRunnable() {
        @Override
        public void run() {
          // if this is a no_ack test, then we need to slow down more because of the
          // pauses in the nonblocking operations
          org.apache.geode.internal.cache.InitialImageOperation.slowImageProcessing = 200;
        }
      });
    }

    AsyncInvocation<Void> asyncGII = vm2.invokeAsync(create);

    if (!getRegionAttributes().getScope().isGlobal()) {
      // wait for nonblocking operations to complete
      ThreadUtils.join(async, 30 * 1000);

      vm2.invoke("Set fast image processing", new SerializableRunnable() {
        @Override
        public void run() {
          org.apache.geode.internal.cache.InitialImageOperation.slowImageProcessing = 0;
        }
      });
      logger.info("after async nonblocking ops complete");
    }

    // wait for GII to complete
    ThreadUtils.join(asyncGII, 30 * 1000);
    final long iiComplete = System.currentTimeMillis();
    logger.info("Complete GetInitialImage at: " + System.currentTimeMillis());
    if (getRegionAttributes().getScope().isGlobal()) {
      // wait for nonblocking operations to complete
      ThreadUtils.join(async, 30 * 1000);
    }

    if (async.exceptionOccurred()) {
      fail("async failed", async.getException());
    }
    if (asyncGII.exceptionOccurred()) {
      fail("asyncGII failed", asyncGII.getException());
    }

    // Locally destroy the region in vm0 so we know that they are not found by
    // a netSearch
    vm0.invoke("Locally destroy region", new CacheSerializableRunnable() {
      @Override
      public void run2() throws CacheException {
        Region<Object, Object> region = getRootRegion().getSubregion(name);
        region.localDestroyRegion();
      }
    });

    // invoke repeating so noack regions wait for all updates to get processed
    vm2.invoke(repeatingIfNecessary(5000,
        new CacheSerializableRunnable("Verify entryCount") {
          boolean entriesDumped = false;

          @Override
          public void run2() throws CacheException {
            Region<Object, Object> region = getRootRegion().getSubregion(name);
            // expected entry count (subtract entries destroyed)
            int entryCount = NB1_NUM_ENTRIES + 200 - NB1_NUM_ENTRIES / 6;
            int actualCount = region.entrySet(false).size();
            if (actualCount == NB1_NUM_ENTRIES + 200) {
              // entries not destroyed, dump entries that were supposed to have been destroyed
              dumpDestroyedEntries(region);
            }
            assertThat(actualCount).isEqualTo(entryCount);
          }

          private void dumpDestroyedEntries(Region region) throws EntryNotFoundException {
            if (entriesDumped) {
              return;
            }
            entriesDumped = true;

            logger.info("DUMPING Entries with values in VM that should have been destroyed:");
            for (int i = 5; i < NB1_NUM_ENTRIES; i += 6) {
              logger.info(i + "-->" + ((org.apache.geode.internal.cache.LocalRegion) region)
                  .getValueInVM(i));
            }
          }
        }));

    vm2.invoke("Verify keys/values & Nonblocking", new CacheSerializableRunnable() {
      @Override
      public void run2() throws CacheException {
        Region<Object, Object> region = getRootRegion().getSubregion(name);
        // expected entry count (subtract entries destroyed)
        int entryCount = NB1_NUM_ENTRIES + 200 - NB1_NUM_ENTRIES / 6;
        assertThat(region.entrySet(false).size()).isEqualTo(entryCount);
        // determine how many entries were updated before getInitialImage
        // was complete
        int numConcurrent = 0;
        numConcurrent = verifyEntryUpdateCounts(region, numConcurrent, values, iiComplete);

        logger.info(name + ": " + numConcurrent
            + " entries out of " + entryCount + " were updated concurrently with getInitialImage");
        // make sure at least some of them were concurrent
        {
          int min = 30;
          String description =
              "Not enough updates concurrent with getInitialImage occurred to my liking. "
                  + numConcurrent + " entries out of " + entryCount
                  + " were updated concurrently with getInitialImage, and I'd expect at least "
                  + min
                  + " or so";
          assertThat(numConcurrent >= min).describedAs(description).isTrue();
        }
      }
    });
  }