in software/nosql/src/main/java/org/apache/brooklyn/entity/nosql/cassandra/CassandraNodeSshDriver.java [232:322]
public void launch() {
String subnetHostname = Machines.findSubnetOrPublicHostname(entity).get();
Set<Entity> seeds = getEntity().getConfig(CassandraNode.INITIAL_SEEDS);
List<Entity> ancestors = getCassandraAncestors();
log.info("Launching " + entity + ": " +
"cluster "+getClusterName()+", " +
"hostname (public) " + getEntity().getAttribute(Attributes.HOSTNAME) + ", " +
"hostname (subnet) " + subnetHostname + ", " +
"seeds "+((CassandraNode)entity).getSeeds()+" (from "+seeds+")");
boolean isFirst = seeds.iterator().next().equals(entity);
if (isClustered() && !isFirst && CassandraDatacenter.WAIT_FOR_FIRST) {
// wait for the first node
long firstStartTime = Entities.submit(entity, DependentConfiguration.attributeWhenReady(
ancestors.get(ancestors.size()-1), CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC)).getUnchecked();
// optionally force a delay before starting subsequent nodes; see comment at CassandraCluster.DELAY_AFTER_FIRST
Duration toWait = Duration.millis(firstStartTime + CassandraDatacenter.DELAY_AFTER_FIRST.toMilliseconds() - System.currentTimeMillis());
if (toWait.toMilliseconds()>0) {
log.info("Launching " + entity + ": delaying launch of non-first node by "+toWait+" to prevent schema disagreements");
Tasks.setBlockingDetails("Pausing to ensure first node has time to start");
Time.sleep(toWait);
Tasks.resetBlockingDetails();
}
}
List<Entity> queuedStart = null;
if (CassandraDatacenter.DELAY_BETWEEN_STARTS!=null && !ancestors.isEmpty()) {
Entity root = ancestors.get(ancestors.size()-1);
// TODO currently use the class as a semaphore; messy, and obviously will not federate;
// should develop a brooklyn framework semaphore (similar to that done on SshMachineLocation)
// and use it - note however the synch block is very very short so relatively safe at least
synchronized (CassandraNode.class) {
queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
if (queuedStart==null) {
queuedStart = new ArrayList<Entity>();
root.sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart);
}
queuedStart.add(getEntity());
root.sensors().set(CassandraDatacenter.QUEUED_START_NODES, queuedStart);
}
do {
// get it again in case it is backed by something external
queuedStart = root.getAttribute(CassandraDatacenter.QUEUED_START_NODES);
if (queuedStart.get(0).equals(getEntity())) break;
synchronized (queuedStart) {
try {
queuedStart.wait(1000);
} catch (InterruptedException e) {
Exceptions.propagate(e);
}
}
} while (true);
// TODO should look at last start time... but instead we always wait
CassandraDatacenter.DELAY_BETWEEN_STARTS.countdownTimer().waitForExpiryUnchecked();
}
try {
// Relies on `bin/cassandra -p <pidfile>`, rather than us writing pid file ourselves.
newScript(MutableMap.of(USE_PID_FILE, false), LAUNCHING)
.body.append(
// log the date to attempt to debug occasional http://wiki.apache.org/cassandra/FAQ#schema_disagreement
// (can be caused by machines out of synch time-wise; but in our case it seems to be caused by other things!)
"echo date on cassandra server `hostname` when launching is `date`",
launchEssentialCommand(),
"echo after essential command")
.execute();
if (!isClustered()) {
InputStream creationScript = DatastoreMixins.getDatabaseCreationScript(entity);
if (creationScript!=null) {
Tasks.setBlockingDetails("Pausing to ensure Cassandra (singleton) has started before running creation script");
Time.sleep(Duration.seconds(20));
Tasks.resetBlockingDetails();
executeScriptAsync(Streams.readFullyStringAndClose(creationScript));
}
}
if (isClustered() && isFirst) {
for (Entity ancestor: getCassandraAncestors()) {
ancestor.sensors().set(CassandraDatacenter.FIRST_NODE_STARTED_TIME_UTC, System.currentTimeMillis());
}
}
} finally {
if (queuedStart!=null) {
Entity head = queuedStart.remove(0);
checkArgument(head.equals(getEntity()), "first queued node was "+head+" but we are "+getEntity());
synchronized (queuedStart) {
queuedStart.notifyAll();
}
}
}
}