in brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java [392:516]
protected void connectSensors() {
// "cassandra" isn't really a protocol, but okay for now
sensors().set(DATASTORE_URL, "cassandra://"+getAttribute(HOSTNAME)+":"+getAttribute(THRIFT_PORT));
super.connectSensors();
jmxHelper = new JmxHelper(this);
boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS);
if (getDriver().isJmxEnabled()) {
jmxFeed = JmxFeed.builder()
.entity(this)
.period(3000, TimeUnit.MILLISECONDS)
.helper(jmxHelper)
.pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
.objectName(storageServiceMBean)
.attributeName("Initialized")
.onSuccess(Functions.forPredicate(Predicates.notNull()))
.onException(Functions.constant(false))
.suppressDuplicates(true))
.pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS)
.objectName(storageServiceMBean)
.attributeName("TokenToEndpointMap")
.onSuccess(new Function<Object, Set<BigInteger>>() {
@Override
public Set<BigInteger> apply(@Nullable Object arg) {
Map input = (Map)arg;
if (input == null || input.isEmpty()) return null;
// FIXME does not work on aws-ec2, uses RFC1918 address
Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
Set<String> tokens = Maps.filterValues(input, self).keySet();
Set<BigInteger> result = Sets.newLinkedHashSet();
for (String token : tokens) {
result.add(new BigInteger(token));
}
return result;
}})
.onException(Functions.<Set<BigInteger>>constant(null))
.suppressDuplicates(true))
.pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME)
.period(60, TimeUnit.SECONDS)
.objectName(snitchMBean)
.operationName("getDatacenter")
.operationParams(ImmutableList.of(getBroadcastAddress()))
.onException(Functions.<String>constant(null))
.suppressDuplicates(true))
.pollOperation(new JmxOperationPollConfig<String>(RACK_NAME)
.period(60, TimeUnit.SECONDS)
.objectName(snitchMBean)
.operationName("getRack")
.operationParams(ImmutableList.of(getBroadcastAddress()))
.onException(Functions.<String>constant(null))
.suppressDuplicates(true))
.pollAttribute(new JmxAttributePollConfig<Integer>(PEERS)
.objectName(storageServiceMBean)
.attributeName("TokenToEndpointMap")
.onSuccess(new Function<Object, Integer>() {
@Override
public Integer apply(@Nullable Object arg) {
Map input = (Map)arg;
if (input == null || input.isEmpty()) return 0;
return input.size();
}
})
.onException(Functions.constant(-1)))
.pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT)
.objectName(storageServiceMBean)
.attributeName("LiveNodes")
.onSuccess(new Function<Object, Integer>() {
@Override
public Integer apply(@Nullable Object arg) {
List input = (List)arg;
if (input == null || input.isEmpty()) return 0;
return input.size();
}
})
.onException(Functions.constant(-1)))
.pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE)
.objectName(readStageMBean)
.attributeName("ActiveCount")
.onException(Functions.constant((Integer)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING)
.objectName(readStageMBean)
.attributeName("PendingTasks")
.onException(Functions.constant((Long)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED)
.objectName(readStageMBean)
.attributeName("CompletedTasks")
.onException(Functions.constant((Long)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE)
.objectName(mutationStageMBean)
.attributeName("ActiveCount")
.onException(Functions.constant((Integer)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING)
.objectName(mutationStageMBean)
.attributeName("PendingTasks")
.onException(Functions.constant((Long)null))
.enabled(retrieveUsageMetrics))
.pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED)
.objectName(mutationStageMBean)
.attributeName("CompletedTasks")
.onException(Functions.constant((Long)null))
.enabled(retrieveUsageMetrics))
.build();
jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
}
if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
functionFeed = FunctionFeed.builder()
.entity(this)
.period(3000, TimeUnit.MILLISECONDS)
.poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
.onException(Functions.constant(-1L))
.callable(new ThriftLatencyChecker(CassandraNodeImpl.this))
.enabled(retrieveUsageMetrics))
.build();
}
connectServiceUpIsRunning();
}