in geode-core/src/distributedTest/java/org/apache/geode/internal/statistics/StatisticsDistributedTest.java [120:488]
public void testPubAndSubCustomStats() throws Exception {
String regionName = "region_" + getName();
VM[] pubs = new VM[NUM_PUBS];
for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
pubs[pubVM] = getHost(0).getVM(pubVM);
}
VM sub = getHost(0).getVM(NUM_PUBS);
for (VM pub : pubs) {
pub.invoke(() -> puts.set(0));
}
String subArchive =
directory.getAbsolutePath() + File.separator + getName() + "_sub" + ".gfs";
String[] pubArchives = new String[NUM_PUBS];
for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
pubArchives[pubVM] =
directory.getAbsolutePath() + File.separator + getName() + "_pub-" + pubVM + ".gfs";
}
for (int i = 0; i < NUM_PUBS; i++) {
final int pubVM = i;
pubs[pubVM].invoke("pub-connect-and-create-data-" + pubVM, () -> {
Properties props = new Properties();
props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
props.setProperty(STATISTIC_SAMPLE_RATE, "1000");
props.setProperty(STATISTIC_ARCHIVE_FILE, pubArchives[pubVM]);
InternalDistributedSystem system = getSystem(props);
// assert that sampler is working as expected
GemFireStatSampler sampler = system.getStatSampler();
assertThat(sampler.isSamplingEnabled()).isTrue();
assertThat(sampler.isAlive()).isTrue();
assertThat(sampler.getArchiveFileName()).isEqualTo(new File(pubArchives[pubVM]));
await("awaiting SampleCollector to exist")
.until(() -> sampler.getSampleCollector() != null);
SampleCollector sampleCollector = sampler.getSampleCollector();
assertThat(sampleCollector).isNotNull();
StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler();
assertThat(archiveHandler).isNotNull();
assertThat(archiveHandler.isArchiving()).isTrue();
// create cache and region
Cache cache = getCache();
RegionFactory<String, Number> factory = cache.createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
RegionMembershipListener rml = new RegionMembershipListener();
rmlRef.set(rml);
factory.addCacheListener(rml);
Region<String, Number> region = factory.create(regionName);
// create the keys
if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) {
for (int key = 0; key < NUM_KEYS; key++) {
region.create("KEY-" + key, null);
}
}
});
}
DistributedMember subMember = sub.invoke("sub-connect-and-create-keys", () -> {
Properties props = new Properties();
props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
props.setProperty(STATISTIC_SAMPLE_RATE, "1000");
props.setProperty(STATISTIC_ARCHIVE_FILE, subArchive);
InternalDistributedSystem system = getSystem(props);
PubSubStats statistics = new PubSubStats(system, "sub-1", 1);
subStatsRef.set(statistics);
// assert that sampler is working as expected
GemFireStatSampler sampler = system.getStatSampler();
assertThat(sampler.isSamplingEnabled()).isTrue();
assertThat(sampler.isAlive()).isTrue();
assertThat(sampler.getArchiveFileName()).isEqualTo(new File(subArchive));
await("awaiting SampleCollector to exist")
.until(() -> sampler.getSampleCollector() != null);
SampleCollector sampleCollector = sampler.getSampleCollector();
assertThat(sampleCollector).isNotNull();
StatArchiveHandler archiveHandler = sampleCollector.getStatArchiveHandler();
assertThat(archiveHandler).isNotNull();
assertThat(archiveHandler.isArchiving()).isTrue();
// create cache and region with UpdateListener
Cache cache = getCache();
RegionFactory<String, Number> factory = cache.createRegionFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
CacheListener<String, Number> cl = new UpdateListener(statistics);
factory.addCacheListener(cl);
Region<String, Number> region = factory.create(regionName);
// create the keys
if (region.getAttributes().getScope() == Scope.DISTRIBUTED_ACK) {
for (int key = 0; key < NUM_KEYS; key++) {
region.create("KEY-" + key, null);
}
}
assertThat(statistics.getUpdateEvents()).isZero();
return system.getDistributedMember();
});
for (int i = 0; i < NUM_PUBS; i++) {
final int pubVM = i;
AsyncInvocation<?>[] publishers = new AsyncInvocation[NUM_PUB_THREADS];
for (int j = 0; j < NUM_PUB_THREADS; j++) {
final int pubThread = j;
publishers[pubThread] = pubs[pubVM]
.invokeAsync("pub-connect-and-put-data-" + pubVM + "-thread-" + pubThread, () -> {
PubSubStats statistics = new PubSubStats(basicGetSystem(), "pub-" + pubThread, pubVM);
pubStatsRef.set(pubThread, statistics);
RegionMembershipListener rml = rmlRef.get();
Region<String, Number> region = getCache().getRegion(regionName);
// assert that sub is in rml membership
assertThat(rml).isNotNull();
await("awaiting Membership to contain subMember")
.until(() -> rml.contains(subMember) && rml.size() == NUM_PUBS);
// publish lots of puts cycling through the NUM_KEYS
assertThat(statistics.getPuts()).isZero();
// cycle through the keys randomly
if (RANDOMIZE_PUTS) {
Random randomGenerator = new Random();
int key = 0;
for (int idx = 0; idx < MAX_PUTS; idx++) {
long start = statistics.startPut();
key = randomGenerator.nextInt(NUM_KEYS);
region.put("KEY-" + key, idx);
statistics.endPut(start);
}
// cycle through the keys in order and wrapping back around
} else {
int key = 0;
for (int idx = 0; idx < MAX_PUTS; idx++) {
long start = statistics.startPut();
region.put("KEY-" + key, idx);
key++; // cycle through the keys...
if (key >= NUM_KEYS) {
key = 0;
}
statistics.endPut(start);
}
}
assertThat(statistics.getPuts()).isEqualTo(MAX_PUTS);
// wait for 2 samples to ensure all stats have been archived
StatisticsType statSamplerType = getSystem().findType("StatSampler");
Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType);
assertThat(statsArray.length).isOne();
Statistics statSamplerStats = statsArray[0];
int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT);
await("awaiting sampleCount >= 2").until(() -> statSamplerStats
.getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2);
});
}
for (final AsyncInvocation<?> publisher : publishers) {
publisher.join();
assertThat(publisher.exceptionOccurred()).isFalse();
}
}
sub.invoke("sub-wait-for-samples", () -> {
// wait for 2 samples to ensure all stats have been archived
StatisticsType statSamplerType = getSystem().findType("StatSampler");
Statistics[] statsArray = getSystem().findStatisticsByType(statSamplerType);
assertThat(statsArray.length).isOne();
Statistics statSamplerStats = statsArray[0];
int initialSampleCount = statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT);
await("awaiting sampleCount >= 2").until(
() -> statSamplerStats.getInt(StatSamplerStats.SAMPLE_COUNT) >= initialSampleCount + 2);
// now post total updateEvents to static
PubSubStats statistics = subStatsRef.get();
assertThat(statistics).isNotNull();
updateEvents.set(statistics.getUpdateEvents());
});
// validate pub values against sub values
int totalUpdateEvents = sub.invoke(StatisticsDistributedTest::getUpdateEvents);
// validate pub values against pub statistics against pub archive
for (int i = 0; i < NUM_PUBS; i++) {
final int pubIdx = i;
pubs[pubIdx].invoke("pub-validation", () -> {
// add up all the puts
assertThat(pubStatsRef.length()).isEqualTo(NUM_PUB_THREADS);
int totalPuts = 0;
for (int pubThreadIdx = 0; pubThreadIdx < NUM_PUB_THREADS; pubThreadIdx++) {
PubSubStats statistics = pubStatsRef.get(pubThreadIdx);
assertThat(statistics).isNotNull();
totalPuts += statistics.getPuts();
}
// assert that total puts adds up to max puts times num threads
assertThat(totalPuts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS);
// assert that archive file contains same values as statistics
File archive = new File(pubArchives[pubIdx]);
assertThat(archive.exists()).isTrue();
StatArchiveReader reader = new StatArchiveReader(new File[] {archive}, null, false);
double combinedPuts = 0;
List resources = reader.getResourceInstList();
assertThat(resources).isNotNull();
assertThat(resources.isEmpty()).isFalse();
for (ResourceInst ri : (Iterable<ResourceInst>) resources) {
if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) {
continue;
}
StatValue[] statValues = ri.getStatValues();
for (int idx = 0; idx < statValues.length; idx++) {
String statName = ri.getType().getStats()[idx].getName();
assertThat(statName).isNotNull();
if (statName.equals(PubSubStats.PUTS)) {
StatValue sv = statValues[idx];
sv.setFilter(StatValue.FILTER_NONE);
double mostRecent = sv.getSnapshotsMostRecent();
double min = sv.getSnapshotsMinimum();
double max = sv.getSnapshotsMaximum();
double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum();
double mean = sv.getSnapshotsAverage();
double stdDev = sv.getSnapshotsStandardDeviation();
assertThat(mostRecent).isEqualTo(max);
double summation = 0;
double[] rawSnapshots = sv.getRawSnapshots();
for (final double rawSnapshot : rawSnapshots) {
summation += rawSnapshot;
}
assertThat(summation / sv.getSnapshotsSize()).isEqualTo(mean);
combinedPuts += mostRecent;
}
}
}
// assert that sum of mostRecent values for all puts equals totalPuts
assertThat(combinedPuts).isEqualTo(totalPuts);
puts.getAndAdd(totalPuts);
});
}
// validate pub values against sub values
int totalCombinedPuts = 0;
for (int i = 0; i < NUM_PUBS; i++) {
int pubIdx = i;
int totalPuts = pubs[pubIdx].invoke(StatisticsDistributedTest::getPuts);
assertThat(totalPuts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS);
totalCombinedPuts += totalPuts;
}
assertThat(totalUpdateEvents).isEqualTo(totalCombinedPuts);
assertThat(totalCombinedPuts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS);
// validate sub values against sub statistics against sub archive
final int totalPuts = totalCombinedPuts;
sub.invoke("sub-validation", () -> {
PubSubStats statistics = subStatsRef.get();
assertThat(statistics).isNotNull();
int updateEvents = statistics.getUpdateEvents();
assertThat(updateEvents).isEqualTo(totalPuts);
assertThat(updateEvents).isEqualTo(totalUpdateEvents);
assertThat(updateEvents).isEqualTo(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS);
// assert that archive file contains same values as statistics
File archive = new File(subArchive);
assertThat(archive.exists()).isTrue();
StatArchiveReader reader = new StatArchiveReader(new File[] {archive}, null, false);
double combinedUpdateEvents = 0;
List resources = reader.getResourceInstList();
for (ResourceInst ri : (Iterable<ResourceInst>) resources) {
if (!ri.getType().getName().equals(PubSubStats.TYPE_NAME)) {
continue;
}
StatValue[] statValues = ri.getStatValues();
for (int i = 0; i < statValues.length; i++) {
String statName = ri.getType().getStats()[i].getName();
assertThat(statName).isNotNull();
if (statName.equals(PubSubStats.UPDATE_EVENTS)) {
StatValue sv = statValues[i];
sv.setFilter(StatValue.FILTER_NONE);
double mostRecent = sv.getSnapshotsMostRecent();
double min = sv.getSnapshotsMinimum();
double max = sv.getSnapshotsMaximum();
double maxMinusMin = sv.getSnapshotsMaximum() - sv.getSnapshotsMinimum();
double mean = sv.getSnapshotsAverage();
double stdDev = sv.getSnapshotsStandardDeviation();
assertThat(max).isEqualTo(mostRecent);
double summation = 0;
double[] rawSnapshots = sv.getRawSnapshots();
for (final double rawSnapshot : rawSnapshots) {
summation += rawSnapshot;
}
assertThat(summation / sv.getSnapshotsSize()).isEqualTo(mean);
combinedUpdateEvents += mostRecent;
}
}
}
assertThat(combinedUpdateEvents).isEqualTo(totalUpdateEvents);
});
int updateEvents =
sub.invoke(() -> readIntStat(new File(subArchive), "PubSubStats", "updateEvents"));
assertThat(updateEvents).isGreaterThan(0);
assertThat(updateEvents).isEqualTo(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS);
int puts = 0;
for (int pubVM = 0; pubVM < NUM_PUBS; pubVM++) {
int currentPubVM = pubVM;
int vmPuts = pubs[pubVM]
.invoke(() -> readIntStat(new File(pubArchives[currentPubVM]), "PubSubStats", "puts"));
assertThat(vmPuts).isGreaterThan(0);
assertThat(vmPuts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS);
puts += vmPuts;
}
assertThat(puts).isGreaterThan(0);
assertThat(puts).isEqualTo(MAX_PUTS * NUM_PUB_THREADS * NUM_PUBS);
// use regex "testPubAndSubCustomStats"
MultipleArchiveReader reader =
new MultipleArchiveReader(directory, ".*" + getTestMethodName() + ".*\\.gfs");
int combinedUpdateEvents = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.UPDATE_EVENTS);
assertThat(combinedUpdateEvents)
.as("Failed to read updateEvents stat values").isGreaterThan(0);
int combinedPuts = reader.readIntStat(PubSubStats.TYPE_NAME, PubSubStats.PUTS);
assertThat(combinedPuts).as("Failed to read puts stat values").isGreaterThan(0);
assertThat(combinedUpdateEvents)
.as("updateEvents is " + combinedUpdateEvents + " but puts is " + combinedPuts)
.isEqualTo(combinedPuts);
}