public void testHighAvailabilityFeatures()

in geode-core/src/distributedTest/java/org/apache/geode/cache30/TXDistributedDUnitTest.java [523:1013]


  public void testHighAvailabilityFeatures() throws Exception {
    IgnoredException.addIgnoredException("DistributedSystemDisconnectedException");
    final String rgnName = getUniqueName();
    AttributesFactory factory = new AttributesFactory();
    factory.setScope(Scope.DISTRIBUTED_ACK);
    factory.setEarlyAck(false);
    Region rgn = getCache().createRegion(rgnName, factory.create());
    Invoke.invokeInEveryVM(
        new SerializableRunnable("testHighAvailabilityFeatures: intial region configuration") {
          @Override
          public void run() {
            try {
              AttributesFactory factory2 = new AttributesFactory();
              factory2.setScope(Scope.DISTRIBUTED_ACK);
              factory2.setEarlyAck(false);
              factory2.setDataPolicy(DataPolicy.REPLICATE);
              getCache().createRegion(rgnName, factory2.create());
            } catch (CacheException e) {
              Assert.fail("While creating region", e);
            }
          }
        });

    // create entries
    rgn.put("key0", "val0_0");
    rgn.put("key1", "val1_0");

    Host host = Host.getHost(0);
    // This test assumes that there are at least three VMs; the origin and two recipients
    assertTrue(host.getVMCount() >= 3);
    final VM originVM = host.getVM(0);

    // Test that there is no commit after a partial commit message
    // send (only sent to a minority of the recipients)
    originVM.invoke(new SerializableRunnable("Flakey DuringIndividualSend Transaction") {
      @Override
      public void run() {
        final Region rgn1 = getCache().getRegion(rgnName);
        assertNotNull(rgn1);
        try {
          final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
          final CacheTransactionManager txMgrImpl = txMgr2;

          txMgr2.begin();
          // 1. setup an internal callback on originVM that will call
          // disconnectFromDS() on the 2nd duringIndividualSend
          // call.
          ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
          TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
              .getRealDeal(null, null);
          txState.setDuringIndividualSend(new Runnable() {
            private int numCalled = 0;

            @Override
            public synchronized void run() {
              ++numCalled;
              rgn1.getCache().getLogger()
                  .info("setDuringIndividualSend Runnable called " + numCalled + " times");
              if (numCalled > 1) {
                MembershipManagerHelper.crashDistributedSystem(getSystem());
              }
            }
          });
          rgn1.put("key0", "val0_1");
          rgn1.put("key1", "val1_1");
          // 2. commit a transaction in originVM, it will disconnect from the DS
          txMgr2.commit();
        } catch (VirtualMachineError e) {
          SystemFailure.initiateFailure(e);
          throw e;
        } catch (Throwable e) {
          rgn1.getCache().getLogger().warning("Ignoring Exception", e);
        } finally {
          // Allow this VM to re-connect to the DS upon getCache() call
          closeCache();
        }
      }
    });
    // 3. verify on all VMs that the transaction was not committed
    final SerializableRunnable noChangeValidator =
        new SerializableRunnable("testHighAvailabilityFeatures: validate no change in Region") {
          @Override
          public void run() {
            Region rgn1 = getCache().getRegion(rgnName);
            if (rgn1 == null) {
              // Expect a null region from originVM
              try {
                AttributesFactory factory2 = new AttributesFactory();
                factory2.setScope(Scope.DISTRIBUTED_ACK);
                factory2.setEarlyAck(false);
                factory2.setDataPolicy(DataPolicy.REPLICATE);
                rgn1 = getCache().createRegion(rgnName, factory2.create());
              } catch (CacheException e) {
                Assert.fail("While creating region", e);
              }
            }
            Region.Entry re = rgn1.getEntry("key0");
            assertNotNull(re);
            assertEquals("val0_0", re.getValue());
            re = rgn1.getEntry("key1");
            assertNotNull(re);
            assertEquals("val1_0", re.getValue());
          }
        };
    Invoke.invokeInEveryVM(noChangeValidator);

    // Test that there is no commit after sending to all recipients
    // but prior to sending the "commit process" message
    originVM.invoke(new SerializableRunnable("Flakey AfterIndividualSend Transaction") {
      @Override
      public void run() {
        final Region rgn1 = getCache().getRegion(rgnName);
        assertNotNull(rgn1);
        try {
          final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
          final CacheTransactionManager txMgrImpl = txMgr2;

          txMgr2.begin();
          // 1. setup an internal callback on originVM that will call
          // disconnectFromDS() on AfterIndividualSend
          ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
          TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
              .getRealDeal(null, null);
          txState.setAfterIndividualSend(new Runnable() {
            @Override
            public synchronized void run() {
              MembershipManagerHelper.crashDistributedSystem(getSystem());
            }
          });
          rgn1.put("key0", "val0_2");
          rgn1.put("key1", "val1_2");
          // 2. commit a transaction in originVM, it will disconnect from the DS
          txMgr2.commit();
        } catch (VirtualMachineError e) {
          SystemFailure.initiateFailure(e);
          throw e;
        } catch (Throwable e) {
          rgn1.getCache().getLogger().warning("Ignoring Exception", e);
        } finally {
          // Allow this VM to re-connect to the DS upon getCache() call
          closeCache();
        }
      }
    });
    // 3. verify on all VMs, including the origin, that the transaction was not committed
    Invoke.invokeInEveryVM(noChangeValidator);

    // Test commit success upon a single commit process message received.
    originVM.invoke(new SerializableRunnable("Flakey DuringIndividualCommitProcess Transaction") {
      @Override
      public void run() {
        final Region rgn1 = getCache().getRegion(rgnName);
        assertNotNull(rgn1);
        try {
          final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
          final CacheTransactionManager txMgrImpl = txMgr2;

          txMgr2.begin();

          ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
          TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
              .getRealDeal(null, null);
          // 1. setup an internal callback on originVM that will call
          // disconnectFromDS() on the 2nd internalDuringIndividualCommitProcess
          // call.
          txState.setDuringIndividualCommitProcess(new Runnable() {
            private int numCalled = 0;

            @Override
            public synchronized void run() {
              ++numCalled;
              rgn1.getCache().getLogger()
                  .info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
              if (numCalled > 1) {
                MembershipManagerHelper.crashDistributedSystem(getSystem());
              }
            }
          });
          rgn1.put("key0", "val0_3");
          rgn1.put("key1", "val1_3");
          // 2. commit a transaction in originVM, it will disconnect from the DS
          txMgr2.commit();
        } catch (VirtualMachineError e) {
          SystemFailure.initiateFailure(e);
          throw e;
        } catch (Throwable e) {
          rgn1.getCache().getLogger().warning("Ignoring Exception", e);
        } finally {
          // Allow this VM to re-connect to the DS upon getCache() call
          closeCache();
        }
      }
    });
    // 3. verify on all VMs that the transaction was committed (including the orgin, due to GII)
    SerializableRunnable nonSoloChangeValidator1 = new SerializableRunnable(
        "testHighAvailabilityFeatures: validate v1 non-solo Region changes") {
      @Override
      public void run() {
        Region rgn1 = getCache().getRegion(rgnName);
        if (rgn1 == null) {
          // Expect a null region from originVM
          try {
            AttributesFactory factory2 = new AttributesFactory();
            factory2.setScope(Scope.DISTRIBUTED_ACK);
            factory2.setEarlyAck(false);
            factory2.setDataPolicy(DataPolicy.REPLICATE);
            rgn1 = getCache().createRegion(rgnName, factory2.create());
          } catch (CacheException e) {
            Assert.fail("While creating region", e);
          }
        }
        long giveUp = System.currentTimeMillis() + 10000;
        while (giveUp > System.currentTimeMillis()) {
          try {
            Region.Entry re = rgn1.getEntry("key0");
            assertNotNull(re);
            assertEquals("val0_3", re.getValue());
            re = rgn1.getEntry("key1");
            assertNotNull(re);
            assertEquals("val1_3", re.getValue());
            break;
          } catch (AssertionError e) {
            if (giveUp > System.currentTimeMillis()) {
              throw e;
            }
          }
        }
      }
    };
    Invoke.invokeInEveryVM(nonSoloChangeValidator1);

    // Verify successful solo region commit after duringIndividualSend
    // (same as afterIndividualSend).
    // Create a region that only exists on the origin and another VM
    final String soloRegionName = getUniqueName() + "_solo";
    SerializableRunnable createSoloRegion =
        new SerializableRunnable("testHighAvailabilityFeatures: solo region configuration") {
          @Override
          public void run() {
            try {
              AttributesFactory factory2 = new AttributesFactory();
              factory2.setScope(Scope.DISTRIBUTED_ACK);
              factory2.setEarlyAck(false);
              factory2.setDataPolicy(DataPolicy.REPLICATE);
              Region rgn1 = getCache().createRegion(soloRegionName, factory2.create());
              rgn1.put("soloKey0", "soloVal0_0");
              rgn1.put("soloKey1", "soloVal1_0");
            } catch (CacheException e) {
              Assert.fail("While creating region", e);
            }
          }
        };
    final VM soloRegionVM = host.getVM(1);
    originVM.invoke(createSoloRegion);
    soloRegionVM.invoke(createSoloRegion);
    originVM
        .invoke(new SerializableRunnable("Flakey solo region DuringIndividualSend Transaction") {
          @Override
          public void run() {
            final Region soloRgn = getCache().getRegion(soloRegionName);
            assertNotNull(soloRgn);
            try {
              final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
              final CacheTransactionManager txMgrImpl = txMgr2;

              txMgr2.begin();
              // 1. setup an internal callback on originVM that will call
              // disconnectFromDS() on the 2nd duringIndividualSend
              // call.
              ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
              TXState txState =
                  (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
                      .getRealDeal(null, null);
              txState.setDuringIndividualSend(new Runnable() {
                private int numCalled = 0;

                @Override
                public synchronized void run() {
                  ++numCalled;
                  soloRgn.getCache().getLogger()
                      .info("setDuringIndividualSend Runnable called " + numCalled + " times");
                  if (numCalled > 1) {
                    MembershipManagerHelper.crashDistributedSystem(getSystem());
                  }
                }
              });
              soloRgn.put("soloKey0", "soloVal0_1");
              soloRgn.put("soloKey1", "soloVal1_1");
              // 2. commit a transaction in originVM, it will disconnect from the DS
              txMgr2.commit();
            } catch (VirtualMachineError e) {
              SystemFailure.initiateFailure(e);
              throw e;
            } catch (Throwable e) {
              soloRgn.getCache().getLogger().warning("Ignoring Exception", e);
            } finally {
              // Allow this VM to re-connect to the DS upon getCache() call
              closeCache();
            }
          }
        });
    // 3. verify on the soloRegionVM that the transaction was committed
    final SerializableRunnable soloRegionCommitValidator1 = new SerializableRunnable(
        "testHighAvailabilityFeatures: validate successful v1 commit in solo Region") {
      @Override
      public void run() {
        Region soloRgn = getCache().getRegion(soloRegionName);
        if (soloRgn == null) {
          // Expect a null region from originVM
          try {
            AttributesFactory factory2 = new AttributesFactory();
            factory2.setScope(Scope.DISTRIBUTED_ACK);
            factory2.setEarlyAck(false);
            factory2.setDataPolicy(DataPolicy.REPLICATE);
            soloRgn = getCache().createRegion(soloRegionName, factory2.create());
          } catch (CacheException e) {
            Assert.fail("While creating region ", e);
          }
        }
        Region.Entry re = soloRgn.getEntry("soloKey0");
        assertNotNull(re);
        assertEquals("soloVal0_1", re.getValue());
        re = soloRgn.getEntry("soloKey1");
        assertNotNull(re);
        assertEquals("soloVal1_1", re.getValue());
      }
    };
    originVM.invoke(soloRegionCommitValidator1);
    soloRegionVM.invoke(soloRegionCommitValidator1);
    // verify no change in nonSolo region, re-establish region in originVM
    Invoke.invokeInEveryVM(nonSoloChangeValidator1);

    // Verify no commit for failed send (afterIndividualSend) for solo
    // Region combined with non-solo Region
    originVM.invoke(new SerializableRunnable(
        "Flakey mixed (solo+non-solo) region DuringIndividualSend Transaction") {
      @Override
      public void run() {
        final Region rgn1 = getCache().getRegion(rgnName);
        assertNotNull(rgn1);
        final Region soloRgn = getCache().getRegion(soloRegionName);
        assertNotNull(soloRgn);
        try {
          final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
          final CacheTransactionManager txMgrImpl = txMgr2;

          txMgr2.begin();
          // 1. setup an internal callback on originVM that will call
          // disconnectFromDS() on the afterIndividualSend
          // call.
          ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
          TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
              .getRealDeal(null, null);
          txState.setAfterIndividualSend(new Runnable() {
            @Override
            public synchronized void run() {
              MembershipManagerHelper.crashDistributedSystem(getSystem());
            }
          });

          rgn1.put("key0", "val0_4");
          rgn1.put("key1", "val1_4");
          soloRgn.put("soloKey0", "soloVal0_2");
          soloRgn.put("soloKey1", "soloVal1_2");

          // 2. commit a transaction in originVM, it will disconnect from the DS
          txMgr2.commit();
        } catch (VirtualMachineError e) {
          SystemFailure.initiateFailure(e);
          throw e;
        } catch (Throwable e) {
          rgn1.getCache().getLogger().warning("Ignoring Exception", e);
        } finally {
          // Allow this VM to re-connect to the DS upon getCache() call
          closeCache();
        }
      }
    });
    // Origin and Solo Region VM should be the same as last validation
    originVM.invoke(soloRegionCommitValidator1);
    soloRegionVM.invoke(soloRegionCommitValidator1);
    Invoke.invokeInEveryVM(nonSoloChangeValidator1);

    // Verify commit after sending a single
    // (duringIndividualCommitProcess) commit process for solo Region
    // combined with non-solo Region
    originVM.invoke(new SerializableRunnable(
        "Flakey mixed (solo+non-solo) region DuringIndividualCommitProcess Transaction") {
      @Override
      public void run() {
        final Region rgn1 = getCache().getRegion(rgnName);
        assertNotNull(rgn1);
        final Region soloRgn = getCache().getRegion(soloRegionName);
        assertNotNull(soloRgn);
        try {
          final CacheTransactionManager txMgr2 = getCache().getCacheTransactionManager();
          final CacheTransactionManager txMgrImpl = txMgr2;

          txMgr2.begin();
          // 1. setup an internal callback on originVM that will call
          // disconnectFromDS() on the afterIndividualSend
          // call.
          ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState()).forceLocalBootstrap();
          TXState txState = (TXState) ((TXStateProxyImpl) ((TXManagerImpl) txMgrImpl).getTXState())
              .getRealDeal(null, null);
          txState.setAfterIndividualSend(new Runnable() {
            private int numCalled = 0;

            @Override
            public synchronized void run() {
              ++numCalled;
              rgn1.getCache().getLogger()
                  .info("setDuringIndividualCommitProcess Runnable called " + numCalled + " times");
              if (numCalled > 1) {
                MembershipManagerHelper.crashDistributedSystem(getSystem());
              }
            }
          });

          rgn1.put("key0", "val0_5");
          rgn1.put("key1", "val1_5");
          soloRgn.put("soloKey0", "soloVal0_3");
          soloRgn.put("soloKey1", "soloVal1_3");

          // 2. commit a transaction in originVM, it will disconnect from the DS
          txMgr2.commit();
        } catch (VirtualMachineError e) {
          SystemFailure.initiateFailure(e);
          throw e;
        } catch (Throwable e) {
          rgn1.getCache().getLogger().warning("Ignoring Exception", e);
        } finally {
          // Allow this VM to re-connect to the DS upon getCache() call
          closeCache();
        }
      }
    });
    final SerializableRunnable soloRegionCommitValidator2 = new SerializableRunnable(
        "testHighAvailabilityFeatures: validate successful v2 commit in solo Region") {
      @Override
      public void run() {
        Region soloRgn = getCache().getRegion(soloRegionName);
        if (soloRgn == null) {
          // Expect a null region from originVM
          try {
            AttributesFactory factory2 = new AttributesFactory();
            factory2.setScope(Scope.DISTRIBUTED_ACK);
            factory2.setEarlyAck(false);
            factory2.setDataPolicy(DataPolicy.REPLICATE);
            soloRgn = getCache().createRegion(soloRegionName, factory2.create());
          } catch (CacheException e) {
            Assert.fail("While creating region ", e);
          }
        }
        Region.Entry re = soloRgn.getEntry("soloKey0");
        assertNotNull(re);
        assertEquals("soloVal0_3", re.getValue());
        re = soloRgn.getEntry("soloKey1");
        assertNotNull(re);
        assertEquals("soloVal1_3", re.getValue());
      }
    };
    originVM.invoke(soloRegionCommitValidator2);
    soloRegionVM.invoke(soloRegionCommitValidator2);
    SerializableRunnable nonSoloChangeValidator2 = new SerializableRunnable(
        "testHighAvailabilityFeatures: validate v2 non-solo Region changes") {
      @Override
      public void run() {
        Region rgn1 = getCache().getRegion(rgnName);
        if (rgn1 == null) {
          // Expect a null region from originVM
          try {
            AttributesFactory factory2 = new AttributesFactory();
            factory2.setScope(Scope.DISTRIBUTED_ACK);
            factory2.setEarlyAck(false);
            factory2.setDataPolicy(DataPolicy.REPLICATE);
            rgn1 = getCache().createRegion(rgnName, factory2.create());
          } catch (CacheException e) {
            Assert.fail("While creating region", e);
          }
        }
        Region.Entry re = rgn1.getEntry("key0");
        assertNotNull(re);
        assertEquals("val0_5", re.getValue());
        re = rgn1.getEntry("key1");
        assertNotNull(re);
        assertEquals("val1_5", re.getValue());
      }
    };
    Invoke.invokeInEveryVM(nonSoloChangeValidator2);
  }