in src/java/org/apache/cassandra/service/StorageService.java [3769:3950]
protected synchronized void drain(boolean isFinalShutdown) throws IOException, InterruptedException, ExecutionException
{
if (Stage.areMutationExecutorsTerminated())
{
if (!isFinalShutdown)
logger.warn("Cannot drain node (did it already happen?)");
return;
}
assert !isShutdown;
isShutdown = true;
Throwable preShutdownHookThrowable = Throwables.perform(null, preShutdownHooks.stream().map(h -> h::run));
if (preShutdownHookThrowable != null)
logger.error("Attempting to continue draining after pre-shutdown hooks returned exception", preShutdownHookThrowable);
try
{
String msg = "starting drain process";
if (!isFinalShutdown)
logger.info(msg);
else
logger.debug(msg);
transientMode = Optional.of(Mode.DRAINING);
try
{
/* not clear this is reasonable time, but propagated from prior embedded behaviour */
BatchlogManager.instance.shutdownAndWait(1L, MINUTES);
}
catch (TimeoutException t)
{
logger.error("Batchlog manager timed out shutting down", t);
}
SnapshotManager.instance.close();
HintsService.instance.pauseDispatch();
if (daemon != null)
shutdownClientServers();
Gossiper.instance.stop();
ActiveRepairService.instance().stop();
if (!isFinalShutdown)
{
logger.debug("shutting down MessageService");
transientMode = Optional.of(Mode.DRAINING);
}
if (AccordService.isSetup())
AccordService.instance().shutdownAndWait(1, MINUTES);
// In-progress writes originating here could generate hints to be written,
// which is currently scheduled on the mutation stage. So shut down MessagingService
// before mutation stage, so we can get all the hints saved before shutting down.
try
{
MessagingService.instance().shutdown();
}
catch (Throwable t)
{
// prevent messaging service timing out shutdown from aborting
// drain process; otherwise drain and/or shutdown might throw
logger.error("Messaging service timed out shutting down", t);
}
// ScheduledExecutors shuts down after MessagingService, as MessagingService may issue tasks to it.
ScheduledExecutors.optionalTasks.shutdown();
if (!isFinalShutdown)
{
logger.debug("clearing mutation stage");
transientMode = Optional.of(Mode.DRAINING);
}
Stage.shutdownAndAwaitMutatingExecutors(false,
DRAIN_EXECUTOR_TIMEOUT_MS.getInt(), TimeUnit.MILLISECONDS);
StorageProxy.instance.verifyNoHintsInProgress();
if (!isFinalShutdown)
{
logger.debug("flushing column families");
transientMode = Optional.of(Mode.DRAINING);
}
// we don't want to start any new compactions while we are draining
disableAutoCompaction();
// count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty
totalCFs = 0;
for (Keyspace keyspace : Keyspace.nonLocalStrategy())
totalCFs += keyspace.getColumnFamilyStores().size();
remainingCFs = totalCFs;
// flush
List<Future<?>> flushes = new ArrayList<>();
for (Keyspace keyspace : Keyspace.nonLocalStrategy())
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
flushes.add(cfs.forceFlush(ColumnFamilyStore.FlushReason.DRAIN));
}
// wait for the flushes.
// TODO this is a godawful way to track progress, since they flush in parallel. a long one could
// thus make several short ones "instant" if we wait for them later.
for (Future f : flushes)
{
try
{
FBUtilities.waitOnFuture(f);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
// don't let this stop us from shutting down the commitlog and other thread pools
logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t);
}
remainingCFs--;
}
// Interrupt ongoing compactions and shutdown CM to prevent further compactions.
CompactionManager.instance.forceShutdown();
// Flush the system tables after all other tables are flushed, just in case flushing modifies any system state
// like CASSANDRA-5151. Don't bother with progress tracking since system data is tiny.
// Flush system tables after stopping compactions since they modify
// system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update
// system tables, see SSTableReader.GlobalTidy)
flushes.clear();
for (Keyspace keyspace : Keyspace.system())
{
for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
flushes.add(cfs.forceFlush(ColumnFamilyStore.FlushReason.DRAIN));
}
FBUtilities.waitOnFutures(flushes);
SnapshotManager.instance.shutdownAndWait(1L, MINUTES);
HintsService.instance.shutdownBlocking();
// Interrupt ongoing compactions and shutdown CM to prevent further compactions.
CompactionManager.instance.forceShutdown();
// whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
// there are no segments to replay, so we force the recycling of any remaining (should be at most one)
CommitLog.instance.forceRecycleAllSegments();
CommitLog.instance.shutdownBlocking();
// wait for miscellaneous tasks like sstable and commitlog segment deletion
ColumnFamilyStore.shutdownPostFlushExecutor();
if (isFinalShutdown)
DiskErrorsHandlerService.get().close();
try
{
// we are not shutting down ScheduledExecutors#scheduledFastTasks to be still able to progress time
// fast-tasks executor is shut down in StorageService's shutdown hook added to Runtime
ExecutorUtils.shutdownNowAndWait(1, MINUTES,
ScheduledExecutors.nonPeriodicTasks,
ScheduledExecutors.scheduledTasks,
ScheduledExecutors.optionalTasks);
}
finally
{
if (!isFinalShutdown)
logger.info("{}", Mode.DRAINED);
else
logger.debug("{}", Mode.DRAINED);
transientMode = Optional.of(Mode.DRAINED);
}
}
catch (Throwable t)
{
logger.error("Caught an exception while draining ", t);
}
finally
{
Throwable postShutdownHookThrowable = Throwables.perform(null, postShutdownHooks.stream().map(h -> h::run));
if (postShutdownHookThrowable != null)
logger.error("Post-shutdown hooks returned exception", postShutdownHookThrowable);
}
}