protected synchronized void drain()

in src/java/org/apache/cassandra/service/StorageService.java [3769:3950]


    protected synchronized void drain(boolean isFinalShutdown) throws IOException, InterruptedException, ExecutionException
    {
        if (Stage.areMutationExecutorsTerminated())
        {
            if (!isFinalShutdown)
                logger.warn("Cannot drain node (did it already happen?)");
            return;
        }

        assert !isShutdown;
        isShutdown = true;

        Throwable preShutdownHookThrowable = Throwables.perform(null, preShutdownHooks.stream().map(h -> h::run));
        if (preShutdownHookThrowable != null)
            logger.error("Attempting to continue draining after pre-shutdown hooks returned exception", preShutdownHookThrowable);

        try
        {
            String msg = "starting drain process";
            if (!isFinalShutdown)
                logger.info(msg);
            else
                logger.debug(msg);
            transientMode = Optional.of(Mode.DRAINING);

            try
            {
                /* not clear this is reasonable time, but propagated from prior embedded behaviour */
                BatchlogManager.instance.shutdownAndWait(1L, MINUTES);
            }
            catch (TimeoutException t)
            {
                logger.error("Batchlog manager timed out shutting down", t);
            }

            SnapshotManager.instance.close();
            HintsService.instance.pauseDispatch();

            if (daemon != null)
                shutdownClientServers();

            Gossiper.instance.stop();
            ActiveRepairService.instance().stop();

            if (!isFinalShutdown)
            {
                logger.debug("shutting down MessageService");
                transientMode = Optional.of(Mode.DRAINING);
            }

            if (AccordService.isSetup())
                AccordService.instance().shutdownAndWait(1, MINUTES);

            // In-progress writes originating here could generate hints to be written,
            // which is currently scheduled on the mutation stage. So shut down MessagingService
            // before mutation stage, so we can get all the hints saved before shutting down.
            try
            {
                MessagingService.instance().shutdown();
            }
            catch (Throwable t)
            {
                // prevent messaging service timing out shutdown from aborting
                // drain process; otherwise drain and/or shutdown might throw
                logger.error("Messaging service timed out shutting down", t);
            }

            // ScheduledExecutors shuts down after MessagingService, as MessagingService may issue tasks to it.
            ScheduledExecutors.optionalTasks.shutdown();

            if (!isFinalShutdown)
            {
                logger.debug("clearing mutation stage");
                transientMode = Optional.of(Mode.DRAINING);
            }
            Stage.shutdownAndAwaitMutatingExecutors(false,
                                                    DRAIN_EXECUTOR_TIMEOUT_MS.getInt(), TimeUnit.MILLISECONDS);

            StorageProxy.instance.verifyNoHintsInProgress();

            if (!isFinalShutdown)
            {
                logger.debug("flushing column families");
                transientMode = Optional.of(Mode.DRAINING);
            }

            // we don't want to start any new compactions while we are draining
            disableAutoCompaction();

            // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty
            totalCFs = 0;
            for (Keyspace keyspace : Keyspace.nonLocalStrategy())
                totalCFs += keyspace.getColumnFamilyStores().size();
            remainingCFs = totalCFs;
            // flush
            List<Future<?>> flushes = new ArrayList<>();
            for (Keyspace keyspace : Keyspace.nonLocalStrategy())
            {
                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
                    flushes.add(cfs.forceFlush(ColumnFamilyStore.FlushReason.DRAIN));
            }
            // wait for the flushes.
            // TODO this is a godawful way to track progress, since they flush in parallel.  a long one could
            // thus make several short ones "instant" if we wait for them later.
            for (Future f : flushes)
            {
                try
                {
                    FBUtilities.waitOnFuture(f);
                }
                catch (Throwable t)
                {
                    JVMStabilityInspector.inspectThrowable(t);
                    // don't let this stop us from shutting down the commitlog and other thread pools
                    logger.warn("Caught exception while waiting for memtable flushes during shutdown hook", t);
                }

                remainingCFs--;
            }

            // Interrupt ongoing compactions and shutdown CM to prevent further compactions.
            CompactionManager.instance.forceShutdown();
            // Flush the system tables after all other tables are flushed, just in case flushing modifies any system state
            // like CASSANDRA-5151. Don't bother with progress tracking since system data is tiny.
            // Flush system tables after stopping compactions since they modify
            // system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update
            // system tables, see SSTableReader.GlobalTidy)
            flushes.clear();
            for (Keyspace keyspace : Keyspace.system())
            {
                for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores())
                    flushes.add(cfs.forceFlush(ColumnFamilyStore.FlushReason.DRAIN));
            }
            FBUtilities.waitOnFutures(flushes);

            SnapshotManager.instance.shutdownAndWait(1L, MINUTES);
            HintsService.instance.shutdownBlocking();

            // Interrupt ongoing compactions and shutdown CM to prevent further compactions.
            CompactionManager.instance.forceShutdown();

            // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
            // there are no segments to replay, so we force the recycling of any remaining (should be at most one)
            CommitLog.instance.forceRecycleAllSegments();

            CommitLog.instance.shutdownBlocking();

            // wait for miscellaneous tasks like sstable and commitlog segment deletion
            ColumnFamilyStore.shutdownPostFlushExecutor();

            if (isFinalShutdown)
                DiskErrorsHandlerService.get().close();

            try
            {
                // we are not shutting down ScheduledExecutors#scheduledFastTasks to be still able to progress time
                // fast-tasks executor is shut down in StorageService's shutdown hook added to Runtime
                ExecutorUtils.shutdownNowAndWait(1, MINUTES,
                                                 ScheduledExecutors.nonPeriodicTasks,
                                                 ScheduledExecutors.scheduledTasks,
                                                 ScheduledExecutors.optionalTasks);
            }
            finally
            {
                if (!isFinalShutdown)
                    logger.info("{}", Mode.DRAINED);
                else
                    logger.debug("{}", Mode.DRAINED);
                transientMode = Optional.of(Mode.DRAINED);
            }
        }
        catch (Throwable t)
        {
            logger.error("Caught an exception while draining ", t);
        }
        finally
        {
            Throwable postShutdownHookThrowable = Throwables.perform(null, postShutdownHooks.stream().map(h -> h::run));
            if (postShutdownHookThrowable != null)
                logger.error("Post-shutdown hooks returned exception", postShutdownHookThrowable);
        }
    }