public void testRemoteCommitFailure()

in geode-core/src/distributedTest/java/org/apache/geode/cache30/TXDistributedDUnitTest.java [1353:1556]


  public void testRemoteCommitFailure() throws Exception {
    try {
      disconnectAllFromDS();
      final String rgnName1 = getUniqueName() + "_1";
      final String rgnName2 = getUniqueName() + "_2";
      final String diskStoreName = getUniqueName() + "_store";
      Host host = Host.getHost(0);
      VM origin = host.getVM(0);
      VM trouble1 = host.getVM(1);
      VM trouble2 = host.getVM(2);
      VM noTrouble = host.getVM(3);
      CacheSerializableRunnable initRegions =
          new CacheSerializableRunnable("Initialize no trouble regions") {
            @Override
            public void run2() {
              getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs()).create(diskStoreName);
              TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
              AttributesFactory af = new AttributesFactory();
              af.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
              af.setScope(Scope.DISTRIBUTED_ACK);
              af.setDiskStoreName(diskStoreName);
              getCache().createRegion(rgnName1, af.create());
              getCache().createRegion(rgnName2, af.create());
            }
          };
      origin.invoke(initRegions);
      noTrouble.invoke(initRegions);
      SerializableRunnable initTroulbeRegions =
          new CacheSerializableRunnable("Initialize regions that cause trouble") {
            @Override
            public void run2() {
              getCache().createDiskStoreFactory().setDiskDirs(getDiskDirs())
                  .create(diskStoreName);
              TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = true;
              InternalRegionFactory factory = getCache().createInternalRegionFactory();
              factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE);
              factory.setScope(Scope.DISTRIBUTED_ACK);
              factory.setDiskStoreName(diskStoreName);
              factory.setTestCallable(new TXTroubleMaker());
              factory.create(rgnName1);
              factory.create(rgnName2);
              getCache().getInternalDistributedSystem().addResourceListener(new ShutdownListener());
            }
          };
      trouble1.invoke(initTroulbeRegions);
      trouble2.invoke(initTroulbeRegions);

      SerializableRunnable doTransaction =
          new CacheSerializableRunnable("Run failing transaction") {
            @Override
            public void run2() {
              Cache c = getCache();
              Region r1 = c.getRegion(rgnName1);
              assertNotNull(r1);
              Region r2 = c.getRegion(rgnName2);
              assertNotNull(r2);
              CacheTransactionManager txmgr = c.getCacheTransactionManager();
              txmgr.begin();
              r1.put("k1", "k1");
              r1.put("k2", "k2");
              r1.put(TROUBLE_KEY, TROUBLE_KEY);
              r2.put("k1", "k1");
              r2.put("k2", "k2");
              r2.put(TROUBLE_KEY, TROUBLE_KEY);
              try {
                txmgr.commit();
                fail("Expected an tx incomplete exception");
              } catch (CommitIncompleteException yay) {
                String msg = yay.getMessage();
                // getLogWriter().info("failing exception", yay);
                // Each region on a trouble VM should be mentioned (two regions per trouble VM)
                int ind = 0, match = 0;
                while ((ind = msg.indexOf(rgnName1, ind)) >= 0) {
                  ind++;
                  match++;
                }
                assertEquals(2, match);
                ind = match = 0;
                while ((ind = msg.indexOf(rgnName2, ind)) >= 0) {
                  ind++;
                  match++;
                }
                assertEquals(2, match);
                // DiskAccessExcpetions should be mentioned four times
                ind = match = 0;
                while ((ind = msg.indexOf(DiskAccessException.class.getName(), ind)) >= 0) {
                  ind++;
                  match++;
                }
                assertEquals(4, match);
              }
            }
          };
      IgnoredException ee = null;
      try {
        ee = IgnoredException.addIgnoredException(
            DiskAccessException.class.getName() + "|" + CommitIncompleteException.class.getName()
                + "|" + CommitReplyException.class.getName());
        origin.invoke(doTransaction);
      } finally {
        if (ee != null) {
          ee.remove();
        }
      }

      SerializableCallable allowCacheToShutdown = new SerializableCallable() {
        @Override
        public Object call() throws Exception {
          GemFireCacheImpl cache = (GemFireCacheImpl) getCache();
          List<ResourceEventsListener> listeners =
              cache.getInternalDistributedSystem().getResourceListeners();
          for (ResourceEventsListener l : listeners) {
            if (l instanceof ShutdownListener) {
              ShutdownListener shutListener = (ShutdownListener) l;
              shutListener.unblockShutdown();
            }
          }
          return null;
        }
      };
      trouble1.invoke(allowCacheToShutdown);
      trouble2.invoke(allowCacheToShutdown);

      // Assert proper content on failing VMs
      SerializableRunnable assertTroubledContent =
          new CacheSerializableRunnable("Assert partail commit data") {
            @Override
            public void run2() {
              final Cache c = getCache();
              GeodeAwaitility.await().untilAsserted(new WaitCriterion() {
                @Override
                public boolean done() {
                  return c.getRegion(rgnName1) == null;
                }

                @Override
                public String description() {
                  return null;
                }
              });
              Region r2 = c.getRegion(rgnName2);
              assertNull(r2);
            }
          };
      trouble1.invoke(assertTroubledContent);
      trouble2.invoke(assertTroubledContent);

      // Assert proper content on successful VMs
      SerializableRunnable assertSuccessfulContent =
          new CacheSerializableRunnable("Assert complete commit of data on successful VMs") {
            @Override
            public void run2() {
              Cache c = getCache();
              {
                Region r1 = c.getRegion(rgnName1);
                assertNotNull(r1);
                assertEquals("k1", r1.getEntry("k1").getValue());
                assertEquals("k2", r1.getEntry("k2").getValue());
                assertEquals(TROUBLE_KEY, r1.getEntry(TROUBLE_KEY).getValue());
              }
              {
                Region r2 = c.getRegion(rgnName2);
                assertNotNull(r2);
                assertEquals("k1", r2.getEntry("k1").getValue());
                assertEquals("k2", r2.getEntry("k2").getValue());
                assertEquals(TROUBLE_KEY, r2.getEntry(TROUBLE_KEY).getValue());
              }
            }
          };
      noTrouble.invoke(assertSuccessfulContent);

      // Assert no content on originating VM
      SerializableRunnable assertNoContent =
          new CacheSerializableRunnable("Assert data survives on origin VM") {
            @Override
            public void run2() {
              Cache c = getCache();
              {
                Region r1 = c.getRegion(rgnName1);
                assertNotNull(r1);
                assertNotNull(r1.getEntry("k1"));
                assertNotNull(r1.getEntry("k2"));
                assertNotNull(r1.getEntry(TROUBLE_KEY));
              }
              {
                Region r2 = c.getRegion(rgnName2);
                assertNotNull(r2);
                assertNotNull(r2.getEntry("k1"));
                assertNotNull(r2.getEntry("k2"));
                assertNotNull(r2.getEntry(TROUBLE_KEY));
              }
            }
          };
      origin.invoke(assertNoContent);
    } finally {
      Invoke.invokeInEveryVM(new SerializableCallable() {
        @Override
        public Object call() throws Exception {
          TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS = false;
          return null;
        }
      });
    }
  }