protected void connectSensors()

in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeImpl.java [397:521]


    protected void connectSensors() {
        // "cassandra" isn't really a protocol, but okay for now
        sensors().set(DATASTORE_URL, "cassandra://"+getAttribute(HOSTNAME)+":"+getAttribute(THRIFT_PORT));
        
        super.connectSensors();

        jmxHelper = new JmxHelper(this);
        boolean retrieveUsageMetrics = getConfig(RETRIEVE_USAGE_METRICS);
        
        if (getDriver().isJmxEnabled()) {
            jmxFeed = JmxFeed.builder()
                    .entity(this)
                    .period(3000, TimeUnit.MILLISECONDS)
                    .helper(jmxHelper)
                    .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP_JMX)
                            .objectName(storageServiceMBean)
                            .attributeName("Initialized")
                            .onSuccess(Functions.forPredicate(Predicates.notNull()))
                            .onException(Functions.constant(false))
                            .suppressDuplicates(true))
                    .pollAttribute(new JmxAttributePollConfig<Set<BigInteger>>(TOKENS)
                            .objectName(storageServiceMBean)
                            .attributeName("TokenToEndpointMap")
                            .onSuccess(new Function<Object, Set<BigInteger>>() {
                                @Override
                                public Set<BigInteger> apply(@Nullable Object arg) {
                                    Map input = (Map)arg;
                                    if (input == null || input.isEmpty()) return null;
                                    // FIXME does not work on aws-ec2, uses RFC1918 address
                                    Predicate<String> self = Predicates.in(ImmutableList.of(getAttribute(HOSTNAME), getAttribute(ADDRESS), getAttribute(SUBNET_ADDRESS), getAttribute(SUBNET_HOSTNAME)));
                                    Set<String> tokens = Maps.filterValues(input, self).keySet();
                                    Set<BigInteger> result = Sets.newLinkedHashSet();
                                    for (String token : tokens) {
                                        result.add(new BigInteger(token));
                                    }
                                    return result;
                                }})
                            .onException(Functions.<Set<BigInteger>>constant(null))
                            .suppressDuplicates(true))
                    .pollOperation(new JmxOperationPollConfig<String>(DATACENTER_NAME)
                            .period(60, TimeUnit.SECONDS)
                            .objectName(snitchMBean)
                            .operationName("getDatacenter")
                            .operationParams(ImmutableList.of(getBroadcastAddress()))
                            .onException(Functions.<String>constant(null))
                            .suppressDuplicates(true))
                    .pollOperation(new JmxOperationPollConfig<String>(RACK_NAME)
                            .period(60, TimeUnit.SECONDS)
                            .objectName(snitchMBean)
                            .operationName("getRack")
                            .operationParams(ImmutableList.of(getBroadcastAddress()))
                            .onException(Functions.<String>constant(null))
                            .suppressDuplicates(true))
                    .pollAttribute(new JmxAttributePollConfig<Integer>(PEERS)
                            .objectName(storageServiceMBean)
                            .attributeName("TokenToEndpointMap")
                            .onSuccess(new Function<Object, Integer>() {
                                @Override
                                public Integer apply(@Nullable Object arg) {
                                    Map input = (Map)arg;
                                    if (input == null || input.isEmpty()) return 0;
                                    return input.size();
                                }
                            })
                            .onException(Functions.constant(-1)))
                    .pollAttribute(new JmxAttributePollConfig<Integer>(LIVE_NODE_COUNT)
                            .objectName(storageServiceMBean)
                            .attributeName("LiveNodes")
                            .onSuccess(new Function<Object, Integer>() {
                                @Override
                                public Integer apply(@Nullable Object arg) {
                                    List input = (List)arg;
                                    if (input == null || input.isEmpty()) return 0;
                                    return input.size();
                                }
                            })
                            .onException(Functions.constant(-1)))
                    .pollAttribute(new JmxAttributePollConfig<Integer>(READ_ACTIVE)
                            .objectName(readStageMBean)
                            .attributeName("ActiveCount")
                            .onException(Functions.constant((Integer)null))
                            .enabled(retrieveUsageMetrics))
                    .pollAttribute(new JmxAttributePollConfig<Long>(READ_PENDING)
                            .objectName(readStageMBean)
                            .attributeName("PendingTasks")
                            .onException(Functions.constant((Long)null))
                            .enabled(retrieveUsageMetrics))
                    .pollAttribute(new JmxAttributePollConfig<Long>(READ_COMPLETED)
                            .objectName(readStageMBean)
                            .attributeName("CompletedTasks")
                            .onException(Functions.constant((Long)null))
                            .enabled(retrieveUsageMetrics))
                    .pollAttribute(new JmxAttributePollConfig<Integer>(WRITE_ACTIVE)
                            .objectName(mutationStageMBean)
                            .attributeName("ActiveCount")
                            .onException(Functions.constant((Integer)null))
                            .enabled(retrieveUsageMetrics))
                    .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_PENDING)
                            .objectName(mutationStageMBean)
                            .attributeName("PendingTasks")
                            .onException(Functions.constant((Long)null))
                            .enabled(retrieveUsageMetrics))
                    .pollAttribute(new JmxAttributePollConfig<Long>(WRITE_COMPLETED)
                            .objectName(mutationStageMBean)
                            .attributeName("CompletedTasks")
                            .onException(Functions.constant((Long)null))
                            .enabled(retrieveUsageMetrics))
                    .build();
            
            jmxMxBeanFeed = JavaAppUtils.connectMXBeanSensors(this);
        }
        
        if (Boolean.TRUE.equals(getConfig(USE_THRIFT_MONITORING))) {
            functionFeed = FunctionFeed.builder()
                    .entity(this)
                    .period(3000, TimeUnit.MILLISECONDS)
                    .poll(new FunctionPollConfig<Long, Long>(THRIFT_PORT_LATENCY)
                            .onException(Functions.constant(-1L))
                            .callable(new ThriftLatencyChecker(CassandraNodeImpl.this))
                            .enabled(retrieveUsageMetrics))
                    .build();
        }
        
        connectServiceUpIsRunning();
    }