in server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java [535:771]
public void run() {
SecurityUtil.serverLogin(getConfiguration());
if (authKeyWatcher != null) {
log.info("Seeding ZooKeeper watcher for authentication keys");
try {
authKeyWatcher.updateAuthKeys();
} catch (KeeperException | InterruptedException e) {
// TODO Does there need to be a better check? What are the error conditions that we'd fall
// out here? AUTH_FAILURE?
// If we get the error, do we just put it on a timer and retry the exists(String, Watcher)
// call?
log.error("Failed to perform initial check for authentication tokens in"
+ " ZooKeeper. Delegation token authentication will be unavailable.", e);
}
}
try {
clientAddress = startTabletClientService();
} catch (UnknownHostException e1) {
throw new RuntimeException("Failed to start the tablet client service", e1);
}
MetricsInfo metricsInfo = context.getMetricsInfo();
metrics = new TabletServerMetrics(this);
updateMetrics = new TabletServerUpdateMetrics();
scanMetrics = new TabletServerScanMetrics(this.resourceManager::getOpenFiles);
sessionManager.setZombieCountConsumer(scanMetrics::setZombieScanThreads);
mincMetrics = new TabletServerMinCMetrics();
pausedMetrics = new PausedCompactionMetrics();
blockCacheMetrics = new BlockCacheMetrics(this.resourceManager.getIndexCache(),
this.resourceManager.getDataCache(), this.resourceManager.getSummaryCache());
metricsInfo.addMetricsProducers(this, metrics, updateMetrics, scanMetrics, mincMetrics,
pausedMetrics, blockCacheMetrics);
metricsInfo.init(MetricsInfo.serviceTags(context.getInstanceName(), getApplicationName(),
clientAddress, getResourceGroup()));
announceExistence();
getContext().setServiceLock(tabletServerLock);
try {
walMarker.initWalMarker(getTabletSession());
} catch (Exception e) {
log.error("Unable to create WAL marker node in zookeeper", e);
throw new RuntimeException(e);
}
int threadPoolSize =
getContext().getConfiguration().getCount(Property.TSERV_WAL_SORT_MAX_CONCURRENT);
if (threadPoolSize > 0) {
try {
// Attempt to process all existing log sorting work and start a background
// thread to look for log sorting work in the future
logSorter.startWatchingForRecoveryLogs(threadPoolSize);
} catch (Exception ex) {
log.error("Error starting LogSorter");
throw new RuntimeException(ex);
}
} else {
log.info(
"Log sorting for tablet recovery is disabled, TSERV_WAL_SORT_MAX_CONCURRENT is less than 1.");
}
final AccumuloConfiguration aconf = getConfiguration();
final long onDemandUnloaderInterval =
aconf.getTimeInMillis(Property.TSERV_ONDEMAND_UNLOADER_INTERVAL);
watchCriticalFixedDelay(aconf, onDemandUnloaderInterval, () -> {
evaluateOnDemandTabletsForUnload();
});
HostAndPort managerHost;
while (!isShutdownRequested()) {
if (Thread.currentThread().isInterrupted()) {
log.info("Server process thread has been interrupted, shutting down");
break;
}
updateIdleStatus(getOnlineTablets().isEmpty());
// send all of the pending messages
try {
ManagerMessage mm = null;
ManagerClientService.Client iface = null;
try {
// wait until a message is ready to send, or a server stop
// was requested
while (mm == null && !isShutdownRequested() && !Thread.currentThread().isInterrupted()) {
mm = managerMessages.poll(1, TimeUnit.SECONDS);
updateIdleStatus(getOnlineTablets().isEmpty());
}
// have a message to send to the manager, so grab a
// connection
managerHost = getManagerAddress();
iface = managerConnection(managerHost);
TServiceClient client = iface;
// if while loop does not execute at all and mm != null,
// then finally block should place mm back on queue
while (!Thread.currentThread().isInterrupted() && !isShutdownRequested() && mm != null
&& client != null && client.getOutputProtocol() != null
&& client.getOutputProtocol().getTransport() != null
&& client.getOutputProtocol().getTransport().isOpen()) {
try {
mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
mm = null;
} catch (TException ex) {
log.warn("Error sending message: queuing message again");
managerMessages.putFirst(mm);
mm = null;
throw ex;
}
// if any messages are immediately available grab em and
// send them
mm = managerMessages.poll();
updateIdleStatus(getOnlineTablets().isEmpty());
}
} finally {
if (mm != null) {
managerMessages.putFirst(mm);
}
returnManagerConnection(iface);
sleepUninterruptibly(1, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
log.info("Interrupt Exception received, shutting down");
gracefulShutdown(getContext().rpcCreds());
} catch (Exception e) {
// may have lost connection with manager
// loop back to the beginning and wait for a new one
// this way we survive manager failures
log.error(getClientAddressString() + ": TServerInfo: Exception. Manager down?", e);
}
}
// Tell the Manager we are shutting down so that it doesn't try
// to assign tablets.
ManagerClientService.Client iface = managerConnection(getManagerAddress());
try {
iface.tabletServerStopping(TraceUtil.traceInfo(), getContext().rpcCreds(),
getClientAddressString());
} catch (TException e) {
log.error("Error informing Manager that we are shutting down, halting server", e);
Halt.halt("Error informing Manager that we are shutting down, exiting!", -1);
} finally {
returnManagerConnection(iface);
}
// Best-effort attempt at unloading tablets.
log.debug("Unloading tablets");
final List<Future<?>> futures = new ArrayList<>();
final ThreadPoolExecutor tpe = getContext().threadPools()
.getPoolBuilder(ThreadPoolNames.TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL).numCoreThreads(8)
.numMaxThreads(16).build();
iface = managerConnection(getManagerAddress());
boolean managerDown = false;
try {
for (DataLevel level : new DataLevel[] {DataLevel.USER, DataLevel.METADATA, DataLevel.ROOT}) {
getOnlineTablets().keySet().forEach(ke -> {
if (DataLevel.of(ke.tableId()) == level) {
futures.add(tpe.submit(new UnloadTabletHandler(this, ke, TUnloadTabletGoal.UNASSIGNED,
SteadyTime.from(System.currentTimeMillis(), TimeUnit.MILLISECONDS))));
}
});
while (!futures.isEmpty()) {
Iterator<Future<?>> unloads = futures.iterator();
while (unloads.hasNext()) {
Future<?> f = unloads.next();
if (f.isDone()) {
if (!managerDown) {
ManagerMessage mm = managerMessages.poll();
try {
mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
} catch (TException e) {
managerDown = true;
log.debug("Error sending message to Manager during tablet unloading, msg: {}",
e.getMessage());
}
}
unloads.remove();
}
}
log.debug("Waiting on {} {} tablets to close.", futures.size(), level);
UtilWaitThread.sleep(1000);
}
log.debug("All {} tablets unloaded", level);
}
} finally {
if (!managerDown) {
try {
ManagerMessage mm = managerMessages.poll();
do {
if (mm != null) {
mm.send(getContext().rpcCreds(), getClientAddressString(), iface);
}
mm = managerMessages.poll();
} while (mm != null);
} catch (TException e) {
log.debug("Error sending message to Manager during tablet unloading, msg: {}",
e.getMessage());
}
}
returnManagerConnection(iface);
tpe.shutdown();
}
log.debug("Stopping Thrift Servers");
if (server != null) {
server.stop();
}
try {
log.debug("Closing filesystems");
getVolumeManager().close();
} catch (IOException e) {
log.warn("Failed to close filesystem : {}", e.getMessage(), e);
}
context.getLowMemoryDetector().logGCInfo(getConfiguration());
getShutdownComplete().set(true);
log.info("TServerInfo: stop requested. exiting ... ");
try {
tabletServerLock.unlock();
} catch (Exception e) {
log.warn("Failed to release tablet server lock", e);
}
}