in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraDatacenterImpl.java [149:229]
public void init() {
super.init();
/*
* subscribe to hostname, and keep an accurate set of current seeds in a sensor;
* then at nodes we set the initial seeds to be the current seeds when ready (non-empty)
*/
subscriptions().subscribeToMembers(this, Attributes.HOSTNAME, new SensorEventListener<String>() {
@Override
public void onEvent(SensorEvent<String> event) {
seedTracker.onHostnameChanged(event.getSource(), event.getValue());
}
});
subscriptions().subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
@Override public void onEvent(SensorEvent<Entity> event) {
seedTracker.onMemberRemoved(event.getValue());
}
});
subscriptions().subscribeToMembers(this, Attributes.SERVICE_UP, new SensorEventListener<Boolean>() {
@Override
public void onEvent(SensorEvent<Boolean> event) {
seedTracker.onServiceUpChanged(event.getSource(), event.getValue());
}
});
subscriptions().subscribeToMembers(this, Attributes.SERVICE_STATE_ACTUAL, new SensorEventListener<Lifecycle>() {
@Override
public void onEvent(SensorEvent<Lifecycle> event) {
// trigger a recomputation also when lifecycle state changes,
// because it might not have ruled a seed as inviable when service up went true
// because service state was not yet running
seedTracker.onServiceUpChanged(event.getSource(), Lifecycle.RUNNING==event.getValue());
}
});
// Track the datacenters for this cluster
subscriptions().subscribeToMembers(this, CassandraNode.DATACENTER_NAME, new SensorEventListener<String>() {
@Override
public void onEvent(SensorEvent<String> event) {
Entity member = event.getSource();
String dcName = event.getValue();
if (dcName != null) {
Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
Multimap<String, Entity> mutableDatacenterUsage = (datacenterUsage == null) ? LinkedHashMultimap.<String, Entity>create() : LinkedHashMultimap.create(datacenterUsage);
Optional<String> oldDcName = getKeyOfVal(mutableDatacenterUsage, member);
if (!(oldDcName.isPresent() && dcName.equals(oldDcName.get()))) {
mutableDatacenterUsage.values().remove(member);
mutableDatacenterUsage.put(dcName, member);
sensors().set(DATACENTER_USAGE, mutableDatacenterUsage);
sensors().set(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
}
}
}
private <K,V> Optional<K> getKeyOfVal(Multimap<K,V> map, V val) {
for (Map.Entry<K,V> entry : map.entries()) {
if (Objects.equal(val, entry.getValue())) {
return Optional.of(entry.getKey());
}
}
return Optional.absent();
}
});
subscriptions().subscribe(this, DynamicGroup.MEMBER_REMOVED, new SensorEventListener<Entity>() {
@Override public void onEvent(SensorEvent<Entity> event) {
Entity entity = event.getSource();
Multimap<String, Entity> datacenterUsage = getAttribute(DATACENTER_USAGE);
if (datacenterUsage != null && datacenterUsage.containsValue(entity)) {
Multimap<String, Entity> mutableDatacenterUsage = LinkedHashMultimap.create(datacenterUsage);
mutableDatacenterUsage.values().remove(entity);
sensors().set(DATACENTER_USAGE, mutableDatacenterUsage);
sensors().set(DATACENTERS, Sets.newLinkedHashSet(mutableDatacenterUsage.keySet()));
}
}
});
getMutableEntityType().addEffector(EXECUTE_SCRIPT, new EffectorBody<String>() {
@Override
public String call(ConfigBag parameters) {
return executeScript((String)parameters.getStringKey("commands"));
}
});
}