public void testRemoteCacheWriter()

in geode-dunit/src/main/java/org/apache/geode/cache30/MultiVMRegionTestCase.java [1607:1925]


  public void testRemoteCacheWriter() {
    assertThat(getRegionAttributes().getScope().isDistributed()).isTrue();

    final String name = getUniqueName();
    final Object key = "KEY";
    final Object oldValue = "OLD_VALUE";
    final Object newValue = "NEW_VALUE";
    final Object arg = "ARG";
    final Object exception = "EXCEPTION";

    final Object key2 = "KEY2";
    final Object value2 = "VALUE2";

    SerializableRunnable create = new CacheSerializableRunnable() {
      @Override
      public void run2() throws CacheException {
        Region<Object, Object> region = createRegion(name);

        // Put key2 in the region before any callbacks are
        // registered, so it can be destroyed later
        region.put(key2, value2);
        assertThat(region.size()).isEqualTo(1);
        if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
          GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
          MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
          LocalRegion reRegion;
          reRegion = (LocalRegion) region;
          RegionEntry re = reRegion.getRegionEntry(key2);
          StoredObject so = (StoredObject) re.getValue();
          assertThat(so.getRefCount()).isEqualTo(1);
          assertThat(ma.getStats().getObjects()).isEqualTo(1);
        }
      }
    };

    vm0.invoke("Create Region", create);
    vm1.invoke("Create Region", create);

    //////// Create

    vm1.invoke("Set Writer", () -> {
      final Region<Object, Object> region = getRootRegion().getSubregion(name);
      setWriter(new TestCacheWriter<Object, Object>() {
        @Override
        public void beforeCreate2(EntryEvent<Object, Object> event) throws CacheWriterException {

          if (exception.equals(event.getCallbackArgument())) {
            String s = "Test Exception";
            throw new CacheWriterException(s);
          }

          assertThat(event.getRegion()).isEqualTo(region);
          assertThat(event.getOperation().isCreate()).isTrue();
          assertThat(event.getOperation().isDistributed()).isTrue();
          assertThat(event.getOperation().isExpiration()).isFalse();
          assertThat(event.isOriginRemote()).isTrue();
          assertThat(event.getKey()).isEqualTo(key);
          assertThat(event.getOldValue()).isNull();
          assertThat(event.getNewValue()).isEqualTo(oldValue);
          assertThat(event.getOperation().isLoad()).isFalse();
          assertThat(event.getOperation().isLocalLoad()).isFalse();
          assertThat(event.getOperation().isNetLoad()).isFalse();
          assertThat(event.getOperation().isNetSearch()).isFalse();

        }
      });
      region.getAttributesMutator().setCacheWriter(writer());
      flushIfNecessary(region);
    });

    vm0.invoke("Create with Exception", () -> {
      Region<Object, Object> region = getRootRegion().getSubregion(name);
      try {
        region.put(key, oldValue, exception);
        fail("Should have thrown a CacheWriterException");

      } catch (CacheWriterException ex) {
        assertThat(region.getEntry(key)).isNull();
        assertThat(region.size()).isEqualTo(1);
        if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
          GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
          MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
          assertThat(ma.getStats().getObjects()).isEqualTo(1);
        }
      }
    });

    vm1.invoke("Verify callback", () -> {
      assertThat(writer().wasInvoked()).isTrue();
    });

    vm0.invoke("Create with Argument", () -> {
      Region<Object, Object> region = getRootRegion().getSubregion(name);
      region.put(key, oldValue, arg);
      assertThat(region.size()).isEqualTo(2);
      if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
        GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
        MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
        assertThat(ma.getStats().getObjects()).isEqualTo(2);
        LocalRegion reRegion;
        reRegion = (LocalRegion) region;
        StoredObject so = (StoredObject) reRegion.getRegionEntry(key).getValue();
        assertThat(so.getRefCount()).isEqualTo(1);
      }
    });
    vm1.invoke("Verify callback", () -> {
      assertThat(writer().wasInvoked()).isTrue();
    });

    //////// Update

    vm1.invoke("Set Writer", () -> {
      final Region<Object, Object> region = getRootRegion().getSubregion(name);
      setWriter(new TestCacheWriter<Object, Object>() {
        @Override
        public void beforeUpdate2(EntryEvent<Object, Object> event) throws CacheWriterException {

          Object argument = event.getCallbackArgument();
          if (exception.equals(argument)) {
            String s = "Test Exception";
            throw new CacheWriterException(s);
          }

          assertThat(argument).isEqualTo(arg);

          assertThat(event.getRegion()).isEqualTo(region);
          assertThat(event.getOperation().isUpdate()).isTrue();
          assertThat(event.getOperation().isDistributed()).isTrue();
          assertThat(event.getOperation().isExpiration()).isFalse();
          assertThat(event.isOriginRemote()).isTrue();
          assertThat(event.getKey()).isEqualTo(key);
          assertThat(event.getOldValue()).isEqualTo(oldValue);
          assertThat(event.getNewValue()).isEqualTo(newValue);
          assertThat(event.getOperation().isLoad()).isFalse();
          assertThat(event.getOperation().isLocalLoad()).isFalse();
          assertThat(event.getOperation().isNetLoad()).isFalse();
          assertThat(event.getOperation().isNetSearch()).isFalse();

        }
      });
      region.getAttributesMutator().setCacheWriter(writer());
    });

    vm0.invoke("Update with Exception", () -> {
      Region<Object, Object> region = getRootRegion().getSubregion(name);
      try {
        region.put(key, newValue, exception);
        fail("Should have thrown a CacheWriterException");

      } catch (CacheWriterException ex) {
        Region.Entry entry = region.getEntry(key);
        assertThat(entry.getValue()).isEqualTo(oldValue);
        assertThat(region.size()).isEqualTo(2);
        if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
          GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
          MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
          assertThat(ma.getStats().getObjects()).isEqualTo(2);
          LocalRegion reRegion;
          reRegion = (LocalRegion) region;
          StoredObject so = (StoredObject) reRegion.getRegionEntry(key).getValue();
          assertThat(so.getRefCount()).isEqualTo(1);
        }
      }
    });
    vm1.invoke("Verify callback", () -> {
      assertThat(writer().wasInvoked()).isTrue();
    });

    vm0.invoke("Update with Argument", () -> {
      Region<Object, Object> region = getRootRegion().getSubregion(name);
      region.put(key, newValue, arg);
      assertThat(region.size()).isEqualTo(2);
      if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
        GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
        MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
        assertThat(ma.getStats().getObjects()).isEqualTo(2);
      }
    });
    vm1.invoke("Verify callback", () -> {
      assertThat(writer().wasInvoked()).isTrue();
    });

    //////// Destroy

    vm1.invoke("Set Writer", () -> {
      final Region<Object, Object> region = getRootRegion().getSubregion(name);
      setWriter(new TestCacheWriter<Object, Object>() {
        @Override
        public void beforeDestroy2(EntryEvent<Object, Object> event) throws CacheWriterException {

          Object argument = event.getCallbackArgument();
          if (exception.equals(argument)) {
            String s = "Test Exception";
            throw new CacheWriterException(s);
          }

          assertThat(argument).isEqualTo(arg);

          assertThat(event.getRegion()).isEqualTo(region);
          assertThat(event.getOperation().isDestroy()).isTrue();
          assertThat(event.getOperation().isDistributed()).isTrue();
          assertThat(event.getOperation().isExpiration()).isFalse();
          assertThat(event.isOriginRemote()).isTrue();
          assertThat(event.getKey()).isEqualTo(key);
          assertThat(event.getOldValue()).isEqualTo(newValue);
          assertThat(event.getNewValue()).isNull();
          assertThat(event.getOperation().isLoad()).isFalse();
          assertThat(event.getOperation().isLocalLoad()).isFalse();
          assertThat(event.getOperation().isNetLoad()).isFalse();
          assertThat(event.getOperation().isNetSearch()).isFalse();
        }
      });
      region.getAttributesMutator().setCacheWriter(writer());
    });

    vm0.invoke("Destroy with Exception", () -> {
      Region<Object, Object> region = getRootRegion().getSubregion(name);
      try {
        region.destroy(key, exception);
        fail("Should have thrown a CacheWriterException");

      } catch (CacheWriterException ex) {
        assertThat(region.getEntry(key)).isNotNull();
        assertThat(region.size()).isEqualTo(2);
        if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
          GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
          MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
          assertThat(ma.getStats().getObjects()).isEqualTo(2);
        }
      }
    });
    vm1.invoke("Verify callback", () -> {
      assertThat(writer().wasInvoked()).isTrue();
    });

    vm0.invoke("Destroy with Argument", () -> {
      Region<Object, Object> region = getRootRegion().getSubregion(name);
      region.destroy(key, arg);
      assertThat(region.size()).isEqualTo(1);
      if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
        GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
        MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
        assertThat(ma.getStats().getObjects()).isEqualTo(1);
      }
    });
    vm1.invoke("Verify callback", () -> {
      assertThat(writer().wasInvoked()).isTrue();
    });

    //////// Region Destroy

    vm1.invoke("Set Writer", () -> {
      final Region<Object, Object> region = getRootRegion().getSubregion(name);
      setWriter(new TestCacheWriter<Object, Object>() {
        @Override
        public void beforeRegionDestroy2(RegionEvent<Object, Object> event)
            throws CacheWriterException {

          Object argument = event.getCallbackArgument();
          if (exception.equals(argument)) {
            String s = "Test Exception";
            throw new CacheWriterException(s);
          }

          assertThat(argument).isEqualTo(arg);

          assertThat(event.getRegion()).isEqualTo(region);
          assertThat(event.getOperation().isRegionDestroy()).isTrue();
          assertThat(event.getOperation().isDistributed()).isTrue();
          assertThat(event.getOperation().isExpiration()).isFalse();
          assertThat(event.isOriginRemote()).isTrue();
        }
      });
      region.getAttributesMutator().setCacheWriter(writer());
    });

    vm0.invoke("Destroy with Exception", () -> {
      Region<Object, Object> region = getRootRegion().getSubregion(name);
      try {
        region.destroyRegion(exception);
        fail("Should have thrown a CacheWriterException");

      } catch (CacheWriterException ex) {
        if (region.isDestroyed()) {
          fail("should not have an exception if region is destroyed", ex);
        }
        assertThat(region.size()).isEqualTo(1);
        if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
          GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
          MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
          assertThat(ma.getStats().getObjects()).isEqualTo(1);
        }
      }
    });
    vm1.invoke("Verify callback", () -> {
      assertThat(writer().wasInvoked()).isTrue();
    });

    vm0.invoke("Destroy with Argument", () -> {
      Region<Object, Object> region = getRootRegion().getSubregion(name);
      assertThat(region.size()).isEqualTo(1);
      if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
        GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
        MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
        assertThat(ma.getStats().getObjects()).isEqualTo(1);
      }
      region.destroyRegion(arg);
      if (region.getAttributes().getOffHeap() && !(region instanceof PartitionedRegion)) {
        GemFireCacheImpl gfc = (GemFireCacheImpl) getCache();
        final MemoryAllocatorImpl ma = (MemoryAllocatorImpl) gfc.getOffHeapStore();
        await("waiting for off-heap object count go to zero")
            .until(() -> ma.getStats().getObjects(), equalTo(0));

      }
    });
    vm1.invoke("Verify callback", () -> {
      assertThat(writer().wasInvoked()).isTrue();
    });
  }