in software/database/src/main/java/org/apache/brooklyn/entity/database/mysql/InitSlaveTaskBody.java [117:169]
private void bootstrapSlaveAsync(final Future<ReplicationSnapshot> replicationInfoFuture, final MySqlNode slave) {
DynamicTasks.queue("bootstrap slave replication", new Runnable() {
@Override
public void run() {
ReplicationSnapshot replicationSnapshot;
try {
replicationSnapshot = replicationInfoFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw Exceptions.propagate(e);
}
MySqlNode master = getMaster();
String masterAddress = MySqlClusterUtils.validateSqlParam(master.getAttribute(MySqlNode.SUBNET_ADDRESS));
Integer masterPort = master.getAttribute(MySqlNode.MYSQL_PORT);
String slaveAddress = MySqlClusterUtils.validateSqlParam(slave.getAttribute(MySqlNode.SUBNET_ADDRESS));
String username = MySqlClusterUtils.validateSqlParam(cluster.getConfig(MySqlCluster.SLAVE_USERNAME));
String password = MySqlClusterUtils.validateSqlParam(cluster.getAttribute(MySqlCluster.SLAVE_PASSWORD));
if (replicationSnapshot.getEntityId() != null) {
Entity sourceEntity = Iterables.find(cluster.getMembers(), EntityPredicates.idEqualTo(replicationSnapshot.getEntityId()));
String dumpId = FilenameUtils.removeExtension(replicationSnapshot.getSnapshotPath());
copyDumpAsync(sourceEntity, slave, replicationSnapshot.getSnapshotPath(), dumpId);
DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.IMPORT_DUMP, ImmutableMap.of("path", replicationSnapshot.getSnapshotPath())));
//The dump resets the password to whatever is on the source instance, reset it back.
//We are able to still login because privileges are not flushed, so we just set the password to the same value.
DynamicTasks.queue(Effectors.invocation(slave, MySqlNode.CHANGE_PASSWORD, ImmutableMap.of("password", slave.getAttribute(MySqlNode.PASSWORD)))); //
//Flush privileges to load new users coming from the dump
MySqlClusterUtils.executeSqlOnNodeAsync(slave, "FLUSH PRIVILEGES;");
}
MySqlClusterUtils.executeSqlOnNodeAsync(master, String.format(
"CREATE USER '%s'@'%s' IDENTIFIED BY '%s';\n" +
"GRANT REPLICATION SLAVE ON *.* TO '%s'@'%s';\n",
username, slaveAddress, password, username, slaveAddress));
// Executing this will unblock SERVICE_UP wait in the start effector
String slaveCmd = String.format(
"CHANGE MASTER TO " +
"MASTER_HOST='%s', " +
"MASTER_PORT=%d, " +
"MASTER_USER='%s', " +
"MASTER_PASSWORD='%s', " +
"MASTER_LOG_FILE='%s', " +
"MASTER_LOG_POS=%d;\n" +
"START SLAVE;\n",
masterAddress, masterPort,
username, password,
replicationSnapshot.getBinLogName(),
replicationSnapshot.getBinLogPosition());
MySqlClusterUtils.executeSqlOnNodeAsync(slave, slaveCmd);
}
});
}