in src/java/org/apache/cassandra/db/SSTableImporter.java [77:245]
synchronized List<String> importNewSSTables(Options options)
{
UUID importID = UUID.randomUUID();
logger.info("[{}] Loading new SSTables for {}/{}: {}", importID, cfs.getKeyspaceName(), cfs.getTableName(), options);
List<Pair<Directories.SSTableLister, String>> listers = getSSTableListers(options.srcPaths);
Set<Descriptor> currentDescriptors = new HashSet<>();
for (SSTableReader sstable : cfs.getSSTables(SSTableSet.CANONICAL))
currentDescriptors.add(sstable.descriptor);
List<String> failedDirectories = new ArrayList<>();
// verify first to avoid starting to copy sstables to the data directories and then have to abort.
if (options.verifySSTables || options.verifyTokens || options.failOnMissingIndex)
{
for (Pair<Directories.SSTableLister, String> listerPair : listers)
{
Directories.SSTableLister lister = listerPair.left;
String dir = listerPair.right;
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
{
Descriptor descriptor = entry.getKey();
if (!currentDescriptors.contains(entry.getKey()))
{
try
{
abortIfDraining();
if (options.failOnMissingIndex)
{
Index.Group saiIndexGroup = cfs.indexManager.getIndexGroup(StorageAttachedIndexGroup.GROUP_KEY);
if (saiIndexGroup != null)
{
IndexDescriptor indexDescriptor = IndexDescriptor.create(descriptor,
cfs.getPartitioner(),
cfs.metadata().comparator);
String keyspace = cfs.getKeyspaceName();
String table = cfs.getTableName();
if (!indexDescriptor.isPerSSTableIndexBuildComplete())
throw new IllegalStateException(String.format("Missing SAI index to import for SSTable %s on %s.%s",
indexDescriptor.sstableDescriptor.toString(),
keyspace,
table));
for (Index index : saiIndexGroup.getIndexes())
{
IndexIdentifier indexIdentifier = new IndexIdentifier(keyspace, table, index.getIndexMetadata().name);
if (!indexDescriptor.isPerColumnIndexBuildComplete(indexIdentifier))
throw new IllegalStateException(String.format("Missing SAI index to import for index %s on %s.%s",
index.getIndexMetadata().name,
keyspace,
table));
}
}
}
if (options.verifySSTables || options.verifyTokens)
verifySSTableForImport(descriptor, entry.getValue(), options.verifyTokens, options.verifySSTables, options.extendedVerify);
}
catch (Throwable t)
{
if (dir != null)
{
logger.error("[{}] Failed verifying SSTable {} in directory {}", importID, descriptor, dir, t);
failedDirectories.add(dir);
}
else
{
logger.error("[{}] Failed verifying SSTable {}", importID, descriptor, t);
throw new RuntimeException("Failed verifying SSTable " + descriptor, t);
}
break;
}
}
}
}
}
Set<SSTableReader> newSSTables = new HashSet<>();
for (Pair<Directories.SSTableLister, String> listerPair : listers)
{
Directories.SSTableLister lister = listerPair.left;
String dir = listerPair.right;
if (failedDirectories.contains(dir))
continue;
Set<MovedSSTable> movedSSTables = new HashSet<>();
Set<SSTableReader> newSSTablesPerDirectory = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> entry : lister.list(true).entrySet())
{
try
{
abortIfDraining();
Descriptor oldDescriptor = entry.getKey();
if (currentDescriptors.contains(oldDescriptor))
continue;
File targetDir = getTargetDirectory(dir, oldDescriptor, entry.getValue());
Descriptor newDescriptor = cfs.getUniqueDescriptorFor(entry.getKey(), targetDir);
maybeMutateMetadata(entry.getKey(), options);
movedSSTables.add(new MovedSSTable(newDescriptor, entry.getKey(), entry.getValue()));
SSTableReader sstable = SSTableReader.moveAndOpenSSTable(cfs, entry.getKey(), newDescriptor, entry.getValue(), options.copyData);
newSSTablesPerDirectory.add(sstable);
}
catch (Throwable t)
{
newSSTablesPerDirectory.forEach(s -> s.selfRef().release());
if (dir != null)
{
logger.error("[{}] Failed importing sstables in directory {}", importID, dir, t);
failedDirectories.add(dir);
if (options.copyData)
{
removeCopiedSSTables(movedSSTables);
}
else
{
moveSSTablesBack(movedSSTables);
}
movedSSTables.clear();
newSSTablesPerDirectory.clear();
break;
}
else
{
logger.error("[{}] Failed importing sstables from data directory - renamed SSTables are: {}", importID, movedSSTables, t);
throw new RuntimeException("Failed importing SSTables", t);
}
}
}
newSSTables.addAll(newSSTablesPerDirectory);
}
if (newSSTables.isEmpty())
{
logger.info("[{}] No new SSTables were found for {}/{}", importID, cfs.getKeyspaceName(), cfs.getTableName());
return failedDirectories;
}
logger.info("[{}] Loading new SSTables and building secondary indexes for {}/{}: {}", importID, cfs.getKeyspaceName(), cfs.getTableName(), newSSTables);
if (logger.isTraceEnabled())
logLeveling(importID, newSSTables);
try (Refs<SSTableReader> refs = Refs.ref(newSSTables))
{
abortIfDraining();
// Validate existing SSTable-attached indexes, and then build any that are missing:
if (!cfs.indexManager.validateSSTableAttachedIndexes(newSSTables, false, options.validateIndexChecksum))
cfs.indexManager.buildSSTableAttachedIndexesBlocking(newSSTables);
cfs.getTracker().addSSTables(newSSTables);
for (SSTableReader reader : newSSTables)
{
if (options.invalidateCaches && cfs.isRowCacheEnabled())
invalidateCachesForSSTable(reader);
}
}
catch (Throwable t)
{
logger.error("[{}] Failed adding SSTables", importID, t);
throw new RuntimeException("Failed adding SSTables", t);
}
logger.info("[{}] Done loading load new SSTables for {}/{}", importID, cfs.getKeyspaceName(), cfs.getTableName());
return failedDirectories;
}