public void testPubAndSubCustomStats()

in geode-core/src/distributedTest/java/org/apache/geode/internal/statistics/StatisticsDistributedTest.java [120:488]


  public void testPubAndSubCustomStats() throws Exception {
    String regionName = "region_" + getName();
    VM[] pubs = new VM[NUM_PUBS];
    for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
      pubs[pubVM] = getHost(0).getVM(pubVM);
    }
    VM sub = getHost(0).getVM(NUM_PUBS);

    for (VM pub : pubs) {
      pub.invoke(() -> puts.set(0));
    }

    String subArchive =
        directory.getAbsolutePath() + File.separator + getName() + "_sub" + ".gfs";
    String[] pubArchives = new String[NUM_PUBS];
    for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
      pubArchives[pubVM] =
          directory.getAbsolutePath() + File.separator + getName() + "_pub-" + pubVM + ".gfs";
    }

    for (int i = 0; i < NUM_PUBS; i++) {
      final int pubVM = i;
      pubs[pubVM].invoke("pub-connect-and-create-data-" + pubVM, () -> {
        Properties props = new Properties();
        props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
        props.setProperty(STATISTIC_SAMPLE_RATE, "1000");
        props.setProperty(STATISTIC_ARCHIVE_FILE, pubArchives[pubVM]);

        InternalDistributedSystem system = getSystem(props);

        // assert that sampler is working as expected
        GemFireStatSampler sampler = system.getStatSampler();
        assertThat(sampler.isSamplingEnabled()).isTrue();
        assertThat(sampler.isAlive()).isTrue();
        assertThat(sampler.getArchiveFileName()).isEqualTo(new File(pubArchives[pubVM]));

        await("awaiting SampleCollector to exist")
            .until(() -> sampler.getSampleCollector() != null);

        SampleCollector sampleCollector = sampler.getSampleCollector();
        assertThat(sampleCollector).isNotNull();

        StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler();
        assertThat(archiveHandler).isNotNull();
        assertThat(archiveHandler.isArchiving()).isTrue();

        // create cache and region
        Cache cache = getCache();
        RegionFactory<String, Number> factory = cache.createRegionFactory();
        factory.setScope(Scope.DISTRIBUTED_ACK);

        RegionMembershipListener rml = new RegionMembershipListener();
        rmlRef.set(rml);
        factory.addCacheListener(rml);
        Region<String, Number> region = factory.create(regionName);

        // create the keys
        if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) {
          for (int key = 0; key < NUM_KEYS; key++) {
            region.create("KEY-" + key, null);
          }
        }
      });
    }

    DistributedMember subMember = sub.invoke("sub-connect-and-create-keys", () -> {
      Properties props = new Properties();
      props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
      props.setProperty(STATISTIC_SAMPLE_RATE, "1000");
      props.setProperty(STATISTIC_ARCHIVE_FILE, subArchive);

      InternalDistributedSystem system = getSystem(props);

      PubSubStats statistics = new PubSubStats(system, "sub-1", 1);
      subStatsRef.set(statistics);

      // assert that sampler is working as expected
      GemFireStatSampler sampler = system.getStatSampler();
      assertThat(sampler.isSamplingEnabled()).isTrue();
      assertThat(sampler.isAlive()).isTrue();
      assertThat(sampler.getArchiveFileName()).isEqualTo(new File(subArchive));

      await("awaiting SampleCollector to exist")
          .until(() -> sampler.getSampleCollector() != null);

      SampleCollector sampleCollector = sampler.getSampleCollector();
      assertThat(sampleCollector).isNotNull();

      StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler();
      assertThat(archiveHandler).isNotNull();
      assertThat(archiveHandler.isArchiving()).isTrue();

      // create cache and region with UpdateListener
      Cache cache = getCache();
      RegionFactory<String, Number> factory = cache.createRegionFactory();
      factory.setScope(Scope.DISTRIBUTED_ACK);

      CacheListener<String, Number> cl = new UpdateListener(statistics);
      factory.addCacheListener(cl);
      Region<String, Number> region = factory.create(regionName);

      // create the keys
      if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) {
        for (int key = 0; key < NUM_KEYS; key++) {
          region.create("KEY-" + key, null);
        }
      }

      assertThat(statistics.getUpdateEvents()).isZero();
      return system.getDistributedMember();
    });

    for (int i = 0; i < NUM_PUBS; i++) {
      final int pubVM = i;
      AsyncInvocation<?>[] publishers = new AsyncInvocation[NUM_PUB_THREADS];
      for (int j = 0; j < NUM_PUB_THREADS; j++) {
        final int pubThread = j;
        publishers[pubThread] = pubs[pubVM]
            .invokeAsync("pub-connect-and-put-data-" + pubVM + "-thread-" + pubThread, () -> {
              PubSubStats statistics = new PubSubStats(basicGetSystem(), "pub-" + pubThread, pubVM);
              pubStatsRef.set(pubThread, statistics);

              RegionMembershipListener rml = rmlRef.get();
              Region<String, Number> region = getCache().getRegion(regionName);

              // assert that sub is in rml membership
              assertThat(rml).isNotNull();

              await("awaiting Membership to contain subMember")
                  .until(() -> rml.contains(subMember) && rml.size() == NUM_PUBS);

              // publish lots of puts cycling through the NUM_KEYS
              assertThat(statistics.getPuts()).isZero();

              // cycle through the keys randomly
              if (RANDOMIZE_PUTS) {
                Random randomGenerator = new Random();
                int key = 0;
                for (int idx = 0; idx < MAX_PUTS; idx++) {
                  long start = statistics.startPut();
                  key = randomGenerator.nextInt(NUM_KEYS);
                  region.put("KEY-" + key, idx);
                  statistics.endPut(start);
                }

                // cycle through the keys in order and wrapping back around
              } else {
                int key = 0;
                for (int idx = 0; idx < MAX_PUTS; idx++) {
                  long start = statistics.startPut();
                  region.put("KEY-" + key, idx);
                  key++; // cycle through the keys...
                  if (key >= NUM_KEYS) {
                    key = 0;
                  }
                  statistics.endPut(start);
                }
              }
              assertThat(statistics.getPuts()).isEqualTo(MAX_PUTS);

              // wait for 2 samples to ensure all stats have been archived
              StatisticsType statSamplerType = getSystem().findType("StatSampler");
              Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType);
              assertThat(statsArray.length).isOne();

              Statistics statSamplerStats = statsArray[0];
              int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT);

              await("awaiting sampleCount >= 2").until(() -> statSamplerStats
                  .getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2);
            });
      }

      for (final AsyncInvocation<?> publisher : publishers) {
        publisher.join();
        assertThat(publisher.exceptionOccurred()).isFalse();
      }
    }

    sub.invoke("sub-wait-for-samples", () -> {
      // wait for 2 samples to ensure all stats have been archived
      StatisticsType statSamplerType = getSystem().findType("StatSampler");
      Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType);
      assertThat(statsArray.length).isOne();

      Statistics statSamplerStats = statsArray[0];
      int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT);

      await("awaiting sampleCount >= 2").until(
          () -> statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2);

      // now post total updateEvents to static
      PubSubStats statistics = subStatsRef.get();
      assertThat(statistics).isNotNull();
      updateEvents.set(statistics.getUpdateEvents());
    });

    // validate pub values against sub values
    int totalUpdateEvents = sub.invoke(StatisticsDistributedTest::getUpdateEvents);

    // validate pub values against pub statistics against pub archive
    for (int i = 0; i < NUM_PUBS; i++) {
      final int pubIdx = i;
      pubs[pubIdx].invoke("pub-validation", () -> {
        // add up all the puts
        assertThat(pubStatsRef.length()).isEqualTo(NUM_PUB_THREADS);
        int totalPuts = 0;
        for (int pubThreadIdx = 0; pubThreadIdx < NUM_PUB_THREADS; pubThreadIdx++) {
          PubSubStats statistics = pubStatsRef.get(pubThreadIdx);
          assertThat(statistics).isNotNull();
          totalPuts += statistics.getPuts();
        }

        // assert that total puts adds up to max puts times num threads
        assertThat(totalPuts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS);

        // assert that archive file contains same values as statistics
        File archive = new File(pubArchives[pubIdx]);
        assertThat(archive.exists()).isTrue();

        StatArchiveReader reader = new StatArchiveReader(new File[] {archive}, null, false);

        double combinedPuts = 0;

        List resources = reader.getResourceInstList();
        assertThat(resources).isNotNull();
        assertThat(resources.isEmpty()).isFalse();

        for (ResourceInst ri : (Iterable<ResourceInst>) resources) {
          if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) {
            continue;
          }

          StatValue[] statValues = ri.getStatValues();
          for (int idx = 0; idx < statValues.length; idx++) {
            String statName = ri.getType().getStats()[idx].getName();
            assertThat(statName).isNotNull();

            if (statName.equals(PubSubStats.PUTS)) {
              StatValue sv = statValues[idx];
              sv.setFilter(StatValue.FILTER_NONE);

              double mostRecent = sv.getSnapshotsMostRecent();
              double min = sv.getSnapshotsMinimum();
              double max = sv.getSnapshotsMaximum();
              double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum();
              double mean = sv.getSnapshotsAverage();
              double stdDev = sv.getSnapshotsStandardDeviation();

              assertThat(mostRecent).isEqualTo(max);

              double summation = 0;
              double[] rawSnapshots = sv.getRawSnapshots();
              for (final double rawSnapshot : rawSnapshots) {
                summation += rawSnapshot;
              }
              assertThat(summation / sv.getSnapshotsSize()).isEqualTo(mean);

              combinedPuts += mostRecent;
            }
          }
        }

        // assert that sum of mostRecent values for all puts equals totalPuts
        assertThat(combinedPuts).isEqualTo(totalPuts);
        puts.getAndAdd(totalPuts);
      });
    }

    // validate pub values against sub values
    int totalCombinedPuts = 0;
    for (int i = 0; i < NUM_PUBS; i++) {
      int pubIdx = i;
      int totalPuts = pubs[pubIdx].invoke(StatisticsDistributedTest::getPuts);
      assertThat(totalPuts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS);
      totalCombinedPuts += totalPuts;
    }
    assertThat(totalUpdateEvents).isEqualTo(totalCombinedPuts);
    assertThat(totalCombinedPuts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS);

    // validate sub values against sub statistics against sub archive
    final int totalPuts = totalCombinedPuts;
    sub.invoke("sub-validation", () -> {
      PubSubStats statistics = subStatsRef.get();
      assertThat(statistics).isNotNull();
      int updateEvents = statistics.getUpdateEvents();
      assertThat(updateEvents).isEqualTo(totalPuts);
      assertThat(updateEvents).isEqualTo(totalUpdateEvents);
      assertThat(updateEvents).isEqualTo(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS);

      // assert that archive file contains same values as statistics
      File archive = new File(subArchive);
      assertThat(archive.exists()).isTrue();

      StatArchiveReader reader = new StatArchiveReader(new File[] {archive}, null, false);

      double combinedUpdateEvents = 0;

      List resources = reader.getResourceInstList();
      for (ResourceInst ri : (Iterable<ResourceInst>) resources) {
        if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) {
          continue;
        }

        StatValue[] statValues = ri.getStatValues();
        for (int i = 0; i < statValues.length; i++) {
          String statName = ri.getType().getStats()[i].getName();
          assertThat(statName).isNotNull();

          if (statName.equals(PubSubStats.UPDATE_EVENTS)) {
            StatValue sv = statValues[i];
            sv.setFilter(StatValue.FILTER_NONE);

            double mostRecent = sv.getSnapshotsMostRecent();
            double min = sv.getSnapshotsMinimum();
            double max = sv.getSnapshotsMaximum();
            double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum();
            double mean = sv.getSnapshotsAverage();
            double stdDev = sv.getSnapshotsStandardDeviation();

            assertThat(max).isEqualTo(mostRecent);

            double summation = 0;
            double[] rawSnapshots = sv.getRawSnapshots();
            for (final double rawSnapshot : rawSnapshots) {
              summation += rawSnapshot;
            }
            assertThat(summation / sv.getSnapshotsSize()).isEqualTo(mean);

            combinedUpdateEvents += mostRecent;
          }
        }
      }
      assertThat(combinedUpdateEvents).isEqualTo(totalUpdateEvents);
    });

    int updateEvents =
        sub.invoke(() -> readIntStat(new File(subArchive), "PubSubStats", "updateEvents"));
    assertThat(updateEvents).isGreaterThan(0);
    assertThat(updateEvents).isEqualTo(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS);

    int puts = 0;
    for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
      int currentPubVM = pubVM;
      int vmPuts = pubs[pubVM]
          .invoke(() -> readIntStat(new File(pubArchives[currentPubVM]), "PubSubStats", "puts"));
      assertThat(vmPuts).isGreaterThan(0);
      assertThat(vmPuts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS);
      puts += vmPuts;
    }
    assertThat(puts).isGreaterThan(0);
    assertThat(puts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS);

    // use regex "testPubAndSubCustomStats"

    MultipleArchiveReader reader =
        new MultipleArchiveReader(directory, ".*" + getTestMethodName() + ".*\\.gfs");

    int combinedUpdateEvents = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.UPDATE_EVENTS);
    assertThat(combinedUpdateEvents)
        .as("Failed to read updateEvents stat values").isGreaterThan(0);

    int combinedPuts = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.PUTS);
    assertThat(combinedPuts).as("Failed to read puts stat values").isGreaterThan(0);

    assertThat(combinedUpdateEvents)
        .as("updateEvents is " + combinedUpdateEvents + " but puts is " + combinedPuts)
        .isEqualTo(combinedPuts);
  }