in minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java [503:718]
public synchronized void start() throws IOException, InterruptedException {
if (config.useMiniDFS() && miniDFS.get() == null) {
throw new IllegalStateException("Cannot restart mini when using miniDFS");
}
MiniAccumuloClusterControl control = getClusterControl();
if (config.useExistingInstance()) {
String instanceName = getServerContext().getInstanceName();
if (instanceName == null || instanceName.isBlank()) {
throw new IllegalStateException("Unable to read instance name from zookeeper.");
}
config.setInstanceName(instanceName);
if (!AccumuloStatus.isAccumuloOffline(getServerContext())) {
throw new IllegalStateException(
"The Accumulo instance being used is already running. Aborting.");
}
} else {
if (!initialized) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
MiniAccumuloClusterImpl.this.stop();
} catch (IOException e) {
log.error("IOException while attempting to stop the MiniAccumuloCluster.", e);
} catch (InterruptedException e) {
log.error("The stopping of MiniAccumuloCluster was interrupted.", e);
}
}));
}
if (!config.useExistingZooKeepers()) {
log.warn("Starting ZooKeeper");
control.start(ServerType.ZOOKEEPER);
}
if (!initialized) {
if (!config.useExistingZooKeepers()) {
// sleep a little bit to let zookeeper come up before calling init, seems to work better
long startTime = System.currentTimeMillis();
while (true) {
try (Socket s = new Socket(MiniAccumuloConfigImpl.DEFAULT_ZOOKEEPER_HOST,
config.getZooKeeperPort())) {
s.setReuseAddress(true);
s.getOutputStream().write("ruok\n".getBytes(UTF_8));
s.getOutputStream().flush();
byte[] buffer = new byte[100];
int n = s.getInputStream().read(buffer);
if (n >= 4 && new String(buffer, 0, 4, UTF_8).equals("imok")) {
break;
}
} catch (IOException | RuntimeException e) {
if (System.currentTimeMillis() - startTime >= config.getZooKeeperStartupTime()) {
throw new ZooKeeperBindException("Zookeeper did not start within "
+ (config.getZooKeeperStartupTime() / 1000) + " seconds. Check the logs in "
+ config.getLogDir() + " for errors. Last exception: " + e);
}
// Don't spin absurdly fast
sleepUninterruptibly(250, TimeUnit.MILLISECONDS);
}
}
}
LinkedList<String> args = new LinkedList<>();
args.add("--instance-name");
args.add(config.getInstanceName());
args.add("--user");
args.add(config.getRootUserName());
args.add("--clear-instance-name");
// If we aren't using SASL, add in the root password
final String saslEnabled =
config.getSiteConfig().get(Property.INSTANCE_RPC_SASL_ENABLED.getKey());
if (saslEnabled == null || !Boolean.parseBoolean(saslEnabled)) {
args.add("--password");
args.add(config.getRootPassword());
}
log.warn("Initializing ZooKeeper");
Process initProcess = exec(Initialize.class, args.toArray(new String[0])).getProcess();
int ret = initProcess.waitFor();
if (ret != 0) {
throw new IllegalStateException("Initialize process returned " + ret
+ ". Check the logs in " + config.getLogDir() + " for errors.");
}
initialized = true;
} else {
log.warn("Not initializing ZooKeeper, already initialized");
}
}
log.info("Starting MAC against instance {} and zookeeper(s) {}.", config.getInstanceName(),
config.getZooKeepers());
control.start(ServerType.TABLET_SERVER);
control.start(ServerType.SCAN_SERVER);
int ret = 0;
for (int i = 0; i < 5; i++) {
ret = exec(Main.class, SetGoalState.class.getName(), ManagerGoalState.NORMAL.toString())
.getProcess().waitFor();
if (ret == 0) {
break;
}
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
if (ret != 0) {
throw new IllegalStateException("Could not set manager goal state, process returned " + ret
+ ". Check the logs in " + config.getLogDir() + " for errors.");
}
control.start(ServerType.MANAGER);
control.start(ServerType.GARBAGE_COLLECTOR);
if (executor == null) {
executor = Executors.newSingleThreadExecutor();
}
Set<String> groups;
try {
groups = getCompactionGroupNames();
if (groups.isEmpty()) {
throw new IllegalStateException("No Compactor groups configured.");
}
for (String name : groups) {
// Allow user override
if (!config.getClusterServerConfiguration().getCompactorConfiguration().containsKey(name)) {
config.getClusterServerConfiguration().addCompactorResourceGroup(name, 1);
}
}
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException("Unable to find declared CompactionPlanner class", e);
}
control.start(ServerType.COMPACTOR);
final AtomicBoolean lockAcquired = new AtomicBoolean(false);
final CountDownLatch lockWatcherInvoked = new CountDownLatch(1);
AccumuloLockWatcher miniLockWatcher = new AccumuloLockWatcher() {
@Override
public void lostLock(LockLossReason reason) {
log.warn("Lost lock: " + reason.toString());
miniLock = null;
}
@Override
public void unableToMonitorLockNode(Exception e) {
log.warn("Unable to monitor lock: " + e.getMessage());
miniLock = null;
}
@Override
public void acquiredLock() {
log.debug("Acquired ZK lock for MiniAccumuloClusterImpl");
lockAcquired.set(true);
lockWatcherInvoked.countDown();
}
@Override
public void failedToAcquireLock(Exception e) {
log.warn("Failed to acquire ZK lock for MiniAccumuloClusterImpl, msg: " + e.getMessage());
lockWatcherInvoked.countDown();
miniLock = null;
}
};
InstanceId iid = null;
// It's possible start was called twice...
if (client == null) {
client = Accumulo.newClient().from(getClientProperties()).build();
}
iid = client.instanceOperations().getInstanceId();
// The code below does not use `getServerContext()` as that will
// set the SingletonManager.mode to SERVER which will cause some
// tests to fail
final Map<String,String> properties = config.getSiteConfig();
final int timeout = (int) ConfigurationTypeHelper.getTimeInMillis(properties.getOrDefault(
Property.INSTANCE_ZK_TIMEOUT.getKey(), Property.INSTANCE_ZK_TIMEOUT.getDefaultValue()));
final String secret = properties.get(Property.INSTANCE_SECRET.getKey());
miniLockZk = new ZooSession(MiniAccumuloClusterImpl.class.getSimpleName() + ".lock",
config.getZooKeepers() + ZooUtil.getRoot(iid), timeout, secret);
// It's possible start was called twice...
if (miniLock == null) {
UUID miniUUID = UUID.randomUUID();
// Don't call getServerContext here as it will set the SingletonManager.mode to SERVER
// We don't want that.
ServiceLockPath slp =
((ClientContext) client).getServerPaths().createMiniPath(miniUUID.toString());
String miniZInstancePath = slp.toString();
String miniZDirPath =
miniZInstancePath.substring(0, miniZInstancePath.indexOf("/" + miniUUID.toString()));
try {
var zrw = miniLockZk.asReaderWriter();
zrw.putPersistentData(miniZDirPath, new byte[0], NodeExistsPolicy.SKIP);
zrw.putPersistentData(miniZInstancePath, new byte[0], NodeExistsPolicy.SKIP);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Error creating path in ZooKeeper", e);
}
ServiceLockData sld = new ServiceLockData(miniUUID, "localhost", ThriftService.NONE,
Constants.DEFAULT_RESOURCE_GROUP_NAME);
miniLock = new ServiceLock(miniLockZk, slp, miniUUID);
miniLock.lock(miniLockWatcher, sld);
lockWatcherInvoked.await();
if (!lockAcquired.get()) {
throw new IllegalStateException("Error creating MAC entry in ZooKeeper");
}
}
verifyUp((ClientContext) client, iid);
printProcessSummary();
}