synchronized List importNewSSTables()

in src/java/org/apache/cassandra/db/SSTableImporter.java [77:245]


    synchronized List<String> importNewSSTables(Options options)
    {
        UUID importID = UUID.randomUUID();
        logger.info("[{}] Loading new SSTables for {}/{}: {}", importID, cfs.getKeyspaceName(), cfs.getTableName(), options);

        List<Pair<Directories.SSTableLister, String>> listers = getSSTableListers(options.srcPaths);

        Set<Descriptor> currentDescriptors = new HashSet<>();
        for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
            currentDescriptors.add(sstable.descriptor);
        List<String> failedDirectories = new ArrayList<>();

        // verify first to avoid starting to copy sstables to the data directories and then have to abort.
        if (options.verifySSTables || options.verifyTokens || options.failOnMissingIndex)
        {
            for (Pair<Directories.SSTableLister, String> listerPair : listers)
            {
                Directories.SSTableLister lister = listerPair.left;
                String dir = listerPair.right;
                for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
                {
                    Descriptor descriptor = entry.getKey();
                    if (!currentDescriptors.contains(entry.getKey()))
                    {
                        try
                        {
                            abortIfDraining();

                            if (options.failOnMissingIndex)
                            {
                                Index.Group saiIndexGroup = cfs.indexManager.getIndexGroup(StorageAttachedIndexGroup.GROUP_KEY);
                                if (saiIndexGroup != null)
                                {
                                    IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor,
                                                                                             cfs.getPartitioner(),
                                                                                             cfs.metadata().comparator);

                                    String keyspace = cfs.getKeyspaceName();
                                    String table = cfs.getTableName();

                                    if (!indexDescriptor.isPerSSTableIndexBuildComplete())
                                        throw new IllegalStateException(String.format("Missing SAI index to import for SSTable %s on %s.%s",
                                                                                      indexDescriptor.sstableDescriptor.toString(),
                                                                                      keyspace,
                                                                                      table));

                                    for (Index index : saiIndexGroup.getIndexes())
                                    {
                                        IndexIdentifier indexIdentifier = new IndexIdentifier(keyspace, table, index.getIndexMetadata().name);
                                        if (!indexDescriptor.isPerColumnIndexBuildComplete(indexIdentifier))
                                            throw new IllegalStateException(String.format("Missing SAI index to import for index %s on %s.%s",
                                                                                          index.getIndexMetadata().name,
                                                                                          keyspace,
                                                                                          table));
                                    }
                                }
                            }

                            if (options.verifySSTables || options.verifyTokens)
                                verifySSTableForImport(descriptor, entry.getValue(), options.verifyTokens, options.verifySSTables, options.extendedVerify);
                        }
                        catch (Throwable t)
                        {
                            if (dir != null)
                            {
                                logger.error("[{}] Failed verifying SSTable {} in directory {}", importID, descriptor, dir, t);
                                failedDirectories.add(dir);
                            }
                            else
                            {
                                logger.error("[{}] Failed verifying SSTable {}", importID, descriptor, t);
                                throw new RuntimeException("Failed verifying SSTable " + descriptor, t);
                            }
                            break;
                        }
                    }
                }
            }
        }

        Set<SSTableReader> newSSTables = new HashSet<>();
        for (Pair<Directories.SSTableLister, String> listerPair : listers)
        {
            Directories.SSTableLister lister = listerPair.left;
            String dir = listerPair.right;
            if (failedDirectories.contains(dir))
                continue;

            Set<MovedSSTable> movedSSTables = new HashSet<>();
            Set<SSTableReader> newSSTablesPerDirectory = new HashSet<>();
            for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
            {
                try
                {
                    abortIfDraining();
                    Descriptor oldDescriptor = entry.getKey();
                    if (currentDescriptors.contains(oldDescriptor))
                        continue;

                    File targetDir = getTargetDirectory(dir, oldDescriptor, entry.getValue());
                    Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir);
                    maybeMutateMetadata(entry.getKey(), options);
                    movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue()));
                    SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData);
                    newSSTablesPerDirectory.add(sstable);
                }
                catch (Throwable t)
                {
                    newSSTablesPerDirectory.forEach(s -> s.selfRef().release());
                    if (dir != null)
                    {
                        logger.error("[{}] Failed importing sstables in directory {}", importID, dir, t);
                        failedDirectories.add(dir);
                        if (options.copyData)
                        {
                            removeCopiedSSTables(movedSSTables);
                        }
                        else
                        {
                            moveSSTablesBack(movedSSTables);
                        }
                        movedSSTables.clear();
                        newSSTablesPerDirectory.clear();
                        break;
                    }
                    else
                    {
                        logger.error("[{}] Failed importing sstables from data directory - renamed SSTables are: {}", importID, movedSSTables, t);
                        throw new RuntimeException("Failed importing SSTables", t);
                    }
                }
            }
            newSSTables.addAll(newSSTablesPerDirectory);
        }

        if (newSSTables.isEmpty())
        {
            logger.info("[{}] No new SSTables were found for {}/{}", importID, cfs.getKeyspaceName(), cfs.getTableName());
            return failedDirectories;
        }

        logger.info("[{}] Loading new SSTables and building secondary indexes for {}/{}: {}", importID, cfs.getKeyspaceName(), cfs.getTableName(), newSSTables);
        if (logger.isTraceEnabled())
            logLeveling(importID, newSSTables);

        try (Refs<SSTableReader> refs = Refs.ref(newSSTables))
        {
            abortIfDraining();

            // Validate existing SSTable-attached indexes, and then build any that are missing:
            if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false, options.validateIndexChecksum))
                cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables);

            cfs.getTracker().addSSTables(newSSTables);
            for (SSTableReader reader : newSSTables)
            {
                if (options.invalidateCaches && cfs.isRowCacheEnabled())
                    invalidateCachesForSSTable(reader);
            }
        }
        catch (Throwable t)
        {
            logger.error("[{}] Failed adding SSTables", importID, t);
            throw new RuntimeException("Failed adding SSTables", t);
        }

        logger.info("[{}] Done loading load new SSTables for {}/{}", importID, cfs.getKeyspaceName(), cfs.getTableName());
        return failedDirectories;
    }