in brooklyn-library/software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/mongodb/MongoDBServerImpl.java [61:160]
protected void connectSensors() {
super.connectSensors();
connectServiceUpIsRunning();
int port = sensors().get(MongoDBServer.PORT);
HostAndPort accessibleAddress = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, port);
sensors().set(MONGO_SERVER_ENDPOINT, String.format("%s:%d",
accessibleAddress.getHostText(), accessibleAddress.getPort()));
int httpConsolePort = BrooklynAccessUtils.getBrooklynAccessibleAddress(this, sensors().get(HTTP_PORT)).getPort();
sensors().set(HTTP_INTERFACE_URL, String.format("http://%s:%d",
accessibleAddress.getHostText(), httpConsolePort));
if (clientAccessEnabled()) {
try {
client = MongoDBClientSupport.forServer(this);
} catch (UnknownHostException e) {
LOG.warn("Unable to create client connection to {}, not connecting sensors: {} ", this, e.getMessage());
return;
}
serviceStats = FunctionFeed.builder()
.entity(this)
.poll(new FunctionPollConfig<Object, BasicBSONObject>(STATUS_BSON)
.period(2, TimeUnit.SECONDS)
.callable(new Callable<BasicBSONObject>() {
@Override
public BasicBSONObject call() throws Exception {
return MongoDBServerImpl.this.sensors().get(SERVICE_UP)
? client.getServerStatus()
: null;
}
})
.onException(Functions.<BasicBSONObject>constant(null)))
.build();
if (isReplicaSetMember()) {
replicaSetStats = FunctionFeed.builder()
.entity(this)
.poll(new FunctionPollConfig<Object, ReplicaSetMemberStatus>(REPLICA_SET_MEMBER_STATUS)
.period(2, TimeUnit.SECONDS)
.callable(new Callable<ReplicaSetMemberStatus>() {
/**
* Calls {@link MongoDBClientSupport#getReplicaSetStatus} and
* extracts <code>myState</code> from the response.
* @return
* The appropriate {@link org.apache.brooklyn.entity.nosql.mongodb.ReplicaSetMemberStatus}
* if <code>myState</code> was non-null, {@link ReplicaSetMemberStatus#UNKNOWN} otherwise.
*/
@Override
public ReplicaSetMemberStatus call() {
BasicBSONObject serverStatus = client.getReplicaSetStatus();
int state = serverStatus.getInt("myState", -1);
return ReplicaSetMemberStatus.fromCode(state);
}
})
.onException(Functions.constant(ReplicaSetMemberStatus.UNKNOWN))
.suppressDuplicates(true))
.build();
} else {
sensors().set(IS_PRIMARY_FOR_REPLICA_SET, false);
sensors().set(IS_SECONDARY_FOR_REPLICA_SET, false);
}
} else {
LOG.info("Not monitoring "+this+" to retrieve state via client API");
}
// Take interesting details from STATUS.
subscriptions().subscribe(this, STATUS_BSON, new SensorEventListener<BasicBSONObject>() {
@Override public void onEvent(SensorEvent<BasicBSONObject> event) {
BasicBSONObject map = event.getValue();
if (map != null && !map.isEmpty()) {
sensors().set(UPTIME_SECONDS, map.getDouble("uptime", 0));
// Operations
BasicBSONObject opcounters = (BasicBSONObject) map.get("opcounters");
sensors().set(OPCOUNTERS_INSERTS, opcounters.getLong("insert", 0));
sensors().set(OPCOUNTERS_QUERIES, opcounters.getLong("query", 0));
sensors().set(OPCOUNTERS_UPDATES, opcounters.getLong("update", 0));
sensors().set(OPCOUNTERS_DELETES, opcounters.getLong("delete", 0));
sensors().set(OPCOUNTERS_GETMORE, opcounters.getLong("getmore", 0));
sensors().set(OPCOUNTERS_COMMAND, opcounters.getLong("command", 0));
// Network stats
BasicBSONObject network = (BasicBSONObject) map.get("network");
sensors().set(NETWORK_BYTES_IN, network.getLong("bytesIn", 0));
sensors().set(NETWORK_BYTES_OUT, network.getLong("bytesOut", 0));
sensors().set(NETWORK_NUM_REQUESTS, network.getLong("numRequests", 0));
// Replica set stats
BasicBSONObject repl = (BasicBSONObject) map.get("repl");
if (isReplicaSetMember() && repl != null) {
sensors().set(IS_PRIMARY_FOR_REPLICA_SET, repl.getBoolean("ismaster"));
sensors().set(IS_SECONDARY_FOR_REPLICA_SET, repl.getBoolean("secondary"));
sensors().set(REPLICA_SET_PRIMARY_ENDPOINT, repl.getString("primary"));
}
}
}
});
}