in server/manager/src/main/java/org/apache/accumulo/manager/Manager.java [1105:1460]
public void run() {
final ServerContext context = getContext();
// ACCUMULO-4424 Put up the Thrift servers before getting the lock as a sign of process health
// when a hot-standby
//
// Start the Manager's Fate Service
fateServiceHandler = new FateServiceHandler(this);
managerClientHandler = new ManagerClientServiceHandler(this);
compactionCoordinator = new CompactionCoordinator(this, fateRefs);
ServerAddress sa;
var processor = ThriftProcessorTypes.getManagerTProcessor(this, fateServiceHandler,
compactionCoordinator.getThriftService(), managerClientHandler, getContext());
try {
sa = TServerUtils.createThriftServer(context, getHostname(), Property.MANAGER_CLIENTPORT,
processor, "Manager", null, Property.MANAGER_MINTHREADS,
Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK);
} catch (UnknownHostException e) {
throw new IllegalStateException("Unable to start server on host " + getHostname(), e);
}
// block until we can obtain the ZK lock for the manager. Create the
// initial lock using ThriftService.NONE. This will allow the lock
// allocation to occur, but prevent any services from getting the
// Manager address for the COORDINATOR, FATE, and MANAGER services.
// The lock data is replaced below and the manager address is exposed
// for each of these services.
ServiceLockData sld;
try {
sld = getManagerLock(context.getServerPaths().createManagerPath());
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception getting manager lock", e);
}
// Setting the Manager state to HAVE_LOCK has the side-effect of
// starting the upgrade process if necessary.
setManagerState(ManagerState.HAVE_LOCK);
// Set the HostName **after** initially creating the lock. The lock data is
// updated below with the correct address. This prevents clients from accessing
// the Manager until all of the internal processes are started.
setHostname(sa.address);
recoveryManager = new RecoveryManager(this, timeToCacheRecoveryWalExistence);
context.getZooCache().addZooCacheWatcher(new TableStateWatcher((tableId, event) -> {
TableState state = getTableManager().getTableState(tableId);
log.debug("Table state transition to {} @ {}", state, event);
nextEvent.event(tableId, "Table state in zookeeper changed for %s to %s", tableId, state);
}));
tableInformationStatusPool = ThreadPools.getServerThreadPools()
.createExecutorService(getConfiguration(), Property.MANAGER_STATUS_THREAD_POOL_SIZE, false);
tabletRefreshThreadPool = ThreadPools.getServerThreadPools().getPoolBuilder("Tablet refresh ")
.numCoreThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MINTHREADS))
.numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS))
.build();
Thread statusThread = Threads.createThread("Status Thread", new StatusThread());
statusThread.start();
tserverSet.startListeningForTabletServerChanges();
try {
blockForTservers();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
ZooReaderWriter zReaderWriter = context.getZooSession().asReaderWriter();
try {
zReaderWriter.getChildren(Constants.ZRECOVERY, new Watcher() {
@Override
public void process(WatchedEvent event) {
nextEvent.event("Noticed recovery changes %s", event.getType());
try {
// watcher only fires once, add it back
zReaderWriter.getChildren(Constants.ZRECOVERY, this);
} catch (Exception e) {
log.error("Failed to add log recovery watcher back", e);
}
}
});
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Unable to read " + Constants.ZRECOVERY, e);
}
MetricsInfo metricsInfo = getContext().getMetricsInfo();
ManagerMetrics managerMetrics = new ManagerMetrics(getConfiguration(), this);
var producers = managerMetrics.getProducers(getConfiguration(), this);
producers.add(balancerMetrics);
final TabletGroupWatcher userTableTGW =
new TabletGroupWatcher(this, this.userTabletStore, null, managerMetrics) {
@Override
boolean canSuspendTablets() {
// Always allow user data tablets to enter suspended state.
return true;
}
};
watchers.add(userTableTGW);
final TabletGroupWatcher metadataTableTGW =
new TabletGroupWatcher(this, this.metadataTabletStore, watchers.get(0), managerMetrics) {
@Override
boolean canSuspendTablets() {
// Allow metadata tablets to enter suspended state only if so configured. Generally
// we'll want metadata tablets to
// be immediately reassigned, even if there's a global table.suspension.duration
// setting.
return getConfiguration().getBoolean(Property.MANAGER_METADATA_SUSPENDABLE);
}
};
watchers.add(metadataTableTGW);
final TabletGroupWatcher rootTableTGW =
new TabletGroupWatcher(this, this.rootTabletStore, watchers.get(1), managerMetrics) {
@Override
boolean canSuspendTablets() {
// Never allow root tablet to enter suspended state.
return false;
}
};
watchers.add(rootTableTGW);
while (isUpgrading()) {
UpgradeStatus currentStatus = upgradeCoordinator.getStatus();
if (currentStatus == UpgradeStatus.FAILED || currentStatus == UpgradeStatus.COMPLETE) {
break;
}
switch (currentStatus) {
case UPGRADED_METADATA:
if (rootTableTGW.getState() == NEW) {
rootTableTGW.start();
}
if (metadataTableTGW.getState() == NEW) {
metadataTableTGW.start();
}
if (userTableTGW.getState() == NEW) {
userTableTGW.start();
}
break;
case UPGRADED_ROOT:
if (rootTableTGW.getState() == NEW) {
rootTableTGW.start();
}
if (metadataTableTGW.getState() == NEW) {
metadataTableTGW.start();
}
break;
case UPGRADED_ZOOKEEPER:
// Start processing the root table
if (rootTableTGW.getState() == NEW) {
rootTableTGW.start();
}
break;
case FAILED:
case COMPLETE:
case INITIAL:
default:
break;
}
try {
log.debug("Manager main thread is waiting for upgrade to complete");
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new IllegalStateException("Interrupted while waiting for upgrade to complete", e);
}
}
// In the case where an upgrade is not needed, then we may not
// have stepped through all of the steps in the previous code
// block. Make sure all TGWs are started.
if (upgradeCoordinator.getStatus() != UpgradeStatus.FAILED) {
if (rootTableTGW.getState() == NEW) {
rootTableTGW.start();
}
if (metadataTableTGW.getState() == NEW) {
metadataTableTGW.start();
}
if (userTableTGW.getState() == NEW) {
userTableTGW.start();
}
}
// Once we are sure the upgrade is complete, we can safely allow fate use.
try {
// wait for metadata upgrade running in background to complete
if (upgradeMetadataFuture != null) {
upgradeMetadataFuture.get();
}
} catch (ExecutionException | InterruptedException e) {
throw new IllegalStateException("Metadata upgrade failed", e);
}
// Everything should be fully upgraded by this point, but check before starting fate
// and other processes that depend on the metadata table being available and any
// other tables that may have been created during the upgrade to exist.
if (isUpgrading()) {
throw new IllegalStateException("Upgrade coordinator is unexpectedly not complete");
}
metricsInfo.addMetricsProducers(producers.toArray(new MetricsProducer[0]));
metricsInfo.init(MetricsInfo.serviceTags(getContext().getInstanceName(), getApplicationName(),
sa.getAddress(), getResourceGroup()));
Threads.createThread("Migration Cleanup Thread", new MigrationCleanupThread()).start();
Threads.createThread("ScanServer Cleanup Thread", new ScanServerZKCleaner()).start();
// Don't call start the CompactionCoordinator until we have tservers and upgrade is complete.
compactionCoordinator.start();
this.splitter = new Splitter(this);
this.splitter.start();
try {
Predicate<ZooUtil.LockID> isLockHeld =
lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
var metaInstance = initializeFateInstance(context,
new MetaFateStore<>(context.getZooSession(), managerLock.getLockID(), isLockHeld));
var userInstance = initializeFateInstance(context, new UserFateStore<>(context,
SystemTables.FATE.tableName(), managerLock.getLockID(), isLockHeld));
if (!fateRefs.compareAndSet(null,
Map.of(FateInstanceType.META, metaInstance, FateInstanceType.USER, userInstance))) {
throw new IllegalStateException(
"Unexpected previous fate reference map already initialized");
}
fateReadyLatch.countDown();
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception setting up FaTE cleanup thread", e);
}
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor()
.scheduleWithFixedDelay(() -> ScanServerMetadataEntries.clean(context), 10, 10, MINUTES));
var tabletMergeabilityInterval =
getConfiguration().getDuration(Property.MANAGER_TABLET_MERGEABILITY_INTERVAL);
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().scheduleWithFixedDelay(
new FindMergeableRangeTask(this), tabletMergeabilityInterval.toMillis(),
tabletMergeabilityInterval.toMillis(), MILLISECONDS));
// Make sure that we have a secret key (either a new one or an old one from ZK) before we start
// the manager client service.
Thread authenticationTokenKeyManagerThread = null;
if (authenticationTokenKeyManager != null && keyDistributor != null) {
log.info("Starting delegation-token key manager");
try {
keyDistributor.initialize();
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception setting up delegation-token key manager", e);
}
authenticationTokenKeyManagerThread =
Threads.createThread("Delegation Token Key Manager", authenticationTokenKeyManager);
authenticationTokenKeyManagerThread.start();
boolean logged = false;
while (!authenticationTokenKeyManager.isInitialized()) {
// Print out a status message when we start waiting for the key manager to get initialized
if (!logged) {
log.info("Waiting for AuthenticationTokenKeyManager to be initialized");
logged = true;
}
sleepUninterruptibly(200, MILLISECONDS);
}
// And log when we are initialized
log.info("AuthenticationTokenSecretManager is initialized");
}
// Now that the Manager is up, start the ThriftServer
sa.startThriftServer("Manager Client Service Handler");
clientService = sa.server;
log.info("Started Manager client service at {}", sa.address);
// Replace the ServiceLockData information in the Manager lock node in ZooKeeper.
// This advertises the address that clients can use to connect to the Manager
// for the Coordinator, Fate, and Manager services. Do **not** do this until
// after the upgrade process is finished and the dependent services are started.
UUID uuid = sld.getServerUUID(ThriftService.NONE);
ServiceDescriptors descriptors = new ServiceDescriptors();
for (ThriftService svc : new ThriftService[] {ThriftService.MANAGER, ThriftService.COORDINATOR,
ThriftService.FATE}) {
descriptors
.addService(new ServiceDescriptor(uuid, svc, getHostname(), this.getResourceGroup()));
}
sld = new ServiceLockData(descriptors);
log.info("Setting manager lock data to {}", sld);
try {
managerLock.replaceLockData(sld);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception updating manager lock", e);
}
while (!isShutdownRequested() && clientService.isServing()) {
if (Thread.currentThread().isInterrupted()) {
log.info("Server process thread has been interrupted, shutting down");
break;
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
log.info("Interrupt Exception received, shutting down");
gracefulShutdown(context.rpcCreds());
}
}
log.debug("Shutting down fate.");
getFateRefs().keySet().forEach(type -> fate(type).shutdown(0, MINUTES));
splitter.stop();
log.debug("Stopping Thrift Servers");
sa.server.stop();
final long deadline = System.currentTimeMillis() + MAX_CLEANUP_WAIT_TIME;
try {
statusThread.join(remaining(deadline));
} catch (InterruptedException e) {
throw new IllegalStateException("Exception stopping status thread", e);
}
tableInformationStatusPool.shutdownNow();
tabletRefreshThreadPool.shutdownNow();
compactionCoordinator.shutdown();
// Signal that we want it to stop, and wait for it to do so.
if (authenticationTokenKeyManager != null) {
authenticationTokenKeyManager.gracefulStop();
try {
if (null != authenticationTokenKeyManagerThread) {
authenticationTokenKeyManagerThread.join(remaining(deadline));
}
} catch (InterruptedException e) {
throw new IllegalStateException("Exception waiting on delegation-token key manager", e);
}
}
// quit, even if the tablet servers somehow jam up and the watchers
// don't stop
for (TabletGroupWatcher watcher : watchers) {
try {
watcher.join(remaining(deadline));
} catch (InterruptedException e) {
throw new IllegalStateException("Exception waiting on watcher", e);
}
}
getShutdownComplete().set(true);
log.info("stop requested. exiting ... ");
try {
managerLock.unlock();
} catch (Exception e) {
log.warn("Failed to release Manager lock", e);
}
}