public void testTXUpdateLoadNoConflict()

in geode-dunit/src/main/java/org/apache/geode/cache30/MultiVMRegionTestCase.java [4682:4906]


  public void testTXUpdateLoadNoConflict() {
    assumeThat(supportsTransactions()).isTrue();
    assumeThat(getRegionAttributes().getScope().isGlobal()).isFalse();
    assumeThat(getRegionAttributes().getDataPolicy().withPersistence()).isFalse();

    assertThat(getRegionAttributes().getScope().isDistributed()).isTrue();
    CacheTransactionManager txMgr = getCache().getCacheTransactionManager();

    final String rgnName = getUniqueName();

    try {
      MyTransactionListener tl = new MyTransactionListener();
      setTxListener(txMgr, tl);
      RegionFactory<String, String> factory = getCache().createRegionFactory(getRegionAttributes());
      factory.setDataPolicy(DataPolicy.REPLICATE);
      Region<String, String> rgn = createRegion(rgnName, factory);

      txMgr.begin();
      TransactionId myTXId = txMgr.getTransactionId();

      rgn.create("key", "txValue");

      vm0.invoke("testTXUpdateLoadNoConflict: Create Region & Load value", () -> {
        CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
        MyTransactionListener tl1 = new MyTransactionListener();
        setTxListener(txMgr2, tl1);
        try {
          Region<String, String> rgn1 = createRegion(rgnName);
          AttributesMutator<String, String> mutator = rgn1.getAttributesMutator();
          mutator.setCacheLoader(new CacheLoader<String, String>() {
            int count = 0;

            @Override
            public String load(LoaderHelper<String, String> helper)
                throws CacheLoaderException {
              count++;
              return "LV " + count;
            }

            @Override
            public void close() {}
          });
          Object value = rgn1.get("key");
          assertThat(value).isEqualTo("LV 1");
          getSystem().getLogWriter().info("testTXUpdateLoadNoConflict: loaded Key");
          flushIfNecessary(rgn1);
        } catch (CacheException e) {
          fail("While creating region", e);
        }
      });

      {
        TXStateProxy tx = ((TXManagerImpl) txMgr).pauseTransaction();
        assertThat(rgn.containsKey("key")).isTrue();
        assertThat(rgn.getEntry("key").getValue()).isEqualTo("LV 1");
        ((TXManagerImpl) txMgr).unpauseTransaction(tx);
      }
      // make sure transactional view is still correct
      assertThat(rgn.getEntry("key").getValue()).isEqualTo("txValue");

      txMgr.commit();
      getSystem().getLogWriter().info("testTXUpdateLoadNoConflict: did commit");
      assertThat(rgn.getEntry("key").getValue()).isEqualTo("txValue");
      {
        Collection<EntryEvent<?, ?>> events =
            TxEventTestUtil.getCreateEvents(tl.lastEvent.getEvents());
        assertThat(events.size()).isEqualTo(1);
        EntryEvent<?, ?> ev = events.iterator().next();
        assertThat(ev.getTransactionId()).isEqualTo(myTXId);
        assertThat(rgn).isSameAs(ev.getRegion());
        assertThat(ev.getKey()).isEqualTo("key");
        assertThat(ev.getNewValue()).isEqualTo("txValue");
        assertThat(ev.getOldValue()).isNull();
        assertThat(ev.getOperation().isLocalLoad()).isFalse();
        assertThat(ev.getOperation().isNetLoad()).isFalse();
        assertThat(ev.getOperation().isLoad()).isFalse();
        assertThat(ev.getOperation().isNetSearch()).isFalse();
        assertThat(ev.getOperation().isExpiration()).isFalse();
        assertThat(ev.getCallbackArgument()).isNull();
        assertThat(ev.isCallbackArgumentAvailable()).isTrue();
        assertThat(ev.isOriginRemote()).isFalse();
        assertThat(ev.getOperation().isDistributed()).isTrue();
      }

      // Now setup recreate the region in the controller with NONE
      // so test can do local destroys.
      rgn.localDestroyRegion();
      factory.setDataPolicy(DataPolicy.NORMAL);
      rgn = createRegion(rgnName, factory);

      // now see if net loader is working
      Object v2 = rgn.get("key2");
      assertThat(v2).isEqualTo("LV 2");

      // now confirm that netload does not cause a conflict
      txMgr.begin();
      myTXId = txMgr.getTransactionId();

      rgn.create("key3", "txValue3");

      {
        TXStateProxy tx = ((TXManagerImpl) txMgr).pauseTransaction();
        // do a get outside of the transaction to force a net load
        Object v3 = rgn.get("key3");
        assertThat(v3).isEqualTo("LV 3");
        ((TXManagerImpl) txMgr).unpauseTransaction(tx);
      }
      // make sure transactional view is still correct
      assertThat(rgn.getEntry("key3").getValue()).isEqualTo("txValue3");

      txMgr.commit();
      getSystem().getLogWriter().info("testTXUpdateLoadNoConflict: did commit");
      assertThat(rgn.getEntry("key3").getValue()).isEqualTo("txValue3");
      {
        Collection<EntryEvent<?, ?>> events =
            TxEventTestUtil.getCreateEvents(tl.lastEvent.getEvents());
        assertThat(events.size()).isEqualTo(1);
        EntryEvent<?, ?> ev = events.iterator().next();
        assertThat(ev.getTransactionId()).isEqualTo(myTXId);
        assertThat(rgn).isSameAs(ev.getRegion());
        assertThat(ev.getKey()).isEqualTo("key3");
        assertThat(ev.getNewValue()).isEqualTo("txValue3");
        assertThat(ev.getOldValue()).isNull();
        assertThat(ev.getOperation().isLocalLoad()).isFalse();
        assertThat(ev.getOperation().isNetLoad()).isFalse();
        assertThat(ev.getOperation().isLoad()).isFalse();
        assertThat(ev.getOperation().isNetSearch()).isFalse();
        assertThat(ev.getOperation().isExpiration()).isFalse();
        assertThat(ev.getCallbackArgument()).isNull();
        assertThat(ev.isCallbackArgumentAvailable()).isTrue();
        assertThat(ev.isOriginRemote()).isFalse();
        assertThat(ev.getOperation().isDistributed()).isTrue();
      }

      // now see if tx net loader is working

      // now confirm that netload does not cause a conflict
      txMgr.begin();
      Object v4 = rgn.get("key4");
      assertThat(v4).isEqualTo("LV 4");
      assertThat(rgn.get("key4")).isEqualTo("LV 4");
      assertThat(rgn.getEntry("key4").getValue()).isEqualTo("LV 4");
      txMgr.rollback();
      // confirm that netLoad is transactional
      assertThat(rgn.get("key4")).isEqualTo("LV 5");
      assertThat(rgn.getEntry("key4").getValue()).isEqualTo("LV 5");

      // make sure non-tx netsearch works
      assertThat(rgn.get("key")).isEqualTo("txValue");
      assertThat(rgn.getEntry("key").getValue()).isEqualTo("txValue");

      // make sure net-search result does not conflict with commit
      rgn.localInvalidate("key");
      txMgr.begin();
      myTXId = txMgr.getTransactionId();

      rgn.put("key", "new txValue");

      {
        TXStateProxy tx = ((TXManagerImpl) txMgr).pauseTransaction();
        // do a get outside of the transaction to force a netsearch
        assertThat(rgn.get("key")).isEqualTo("txValue");
        assertThat(rgn.getEntry("key").getValue()).isEqualTo("txValue");
        ((TXManagerImpl) txMgr).unpauseTransaction(tx);
      }
      // make sure transactional view is still correct
      assertThat(rgn.getEntry("key").getValue()).isEqualTo("new txValue");

      txMgr.commit();
      flushIfNecessary(rgn); // give other side change to process commit
      getSystem().getLogWriter().info("testTXUpdateLoadNoConflict: did commit");
      assertThat(rgn.getEntry("key").getValue()).isEqualTo("new txValue");
      {
        Collection<EntryEvent<?, ?>> events =
            TxEventTestUtil.getPutEvents(tl.lastEvent.getEvents());
        assertThat(events.size()).isEqualTo(1);
        EntryEvent<?, ?> ev = events.iterator().next();
        assertThat(ev.getTransactionId()).isEqualTo(myTXId);
        assertThat(rgn).isSameAs(ev.getRegion());
        assertThat(ev.getKey()).isEqualTo("key");
        assertThat(ev.getNewValue()).isEqualTo("new txValue");
        assertThat(ev.getOldValue()).isNull();
        assertThat(ev.getOperation().isLocalLoad()).isFalse();
        assertThat(ev.getOperation().isNetLoad()).isFalse();
        assertThat(ev.getOperation().isLoad()).isFalse();
        assertThat(ev.getOperation().isNetSearch()).isFalse();
        assertThat(ev.getOperation().isExpiration()).isFalse();
        assertThat(ev.getCallbackArgument()).isNull();
        assertThat(ev.isCallbackArgumentAvailable()).isTrue();
        assertThat(ev.isOriginRemote()).isFalse();
        assertThat(ev.getOperation().isDistributed()).isTrue();
      }

      // make sure tx local invalidate allows netsearch
      Object localCmtValue = rgn.getEntry("key").getValue();
      txMgr.begin();
      assertThat(rgn.getEntry("key").getValue()).isSameAs(localCmtValue);
      rgn.localInvalidate("key");
      assertThat(rgn.getEntry("key").getValue()).isNull();
      // now make sure a get will do a netsearch and find the value
      // in the other vm instead of the one in local cmt state
      Object txValue = rgn.get("key");
      assertThat(txValue).isNotSameAs(localCmtValue);
      assertThat(rgn.get("key")).isSameAs(txValue);
      assertThat(rgn.getEntry("key").getValue()).isNotSameAs(localCmtValue);
      // make sure we did a search and not a load
      assertThat(rgn.getEntry("key").getValue()).isEqualTo(localCmtValue);
      // now make sure that if we do a tx distributed invalidate
      // that we will do a load and not a search
      rgn.invalidate("key");
      assertThat(rgn.getEntry("key").getValue()).isNull();
      txValue = rgn.get("key");
      assertThat(txValue).isEqualTo("LV 6");
      assertThat(rgn.get("key")).isSameAs(txValue);
      assertThat(rgn.getEntry("key").getValue()).isEqualTo("LV 6");
      // now make sure after rollback that local cmt state has not changed
      txMgr.rollback();
      assertThat(rgn.getEntry("key").getValue()).isSameAs(localCmtValue);
    } catch (Exception e) {
      getCache().close();
      getSystem().getLogWriter()
          .fine("testTXUpdateLoadNoConflict: Caused exception in createRegion");
      throw e;
    }
  }