in src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java [197:246]
private void drainImportQueue(ImportQueue queue)
{
while (!queue.isEmpty())
{
Pair<Promise<Void>, ImportOptions> pair = queue.poll();
Promise<Void> promise = pair.getLeft();
ImportOptions options = pair.getRight();
CassandraAdapterDelegate cassandra = metadataFetcher.delegate(options.host);
TableOperations tableOperations = cassandra.tableOperations();
if (tableOperations == null)
{
promise.fail(new HttpException(HttpResponseStatus.SERVICE_UNAVAILABLE.code(),
"Cassandra service is unavailable"));
}
else
{
try
{
List<String> failedDirectories =
tableOperations.importNewSSTables(options.keyspace,
options.tableName,
options.directory,
options.resetLevel,
options.clearRepaired,
options.verifySSTables,
options.verifyTokens,
options.invalidateCaches,
options.extendedVerify,
options.copyData);
if (!failedDirectories.isEmpty())
{
promise.fail(new HttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
"Failed to import from directories: " + failedDirectories));
}
else
{
promise.complete();
cleanup(options);
}
}
catch (Exception exception)
{
LOGGER.error("Failed to import SSTables with options={}", options, exception);
promise.fail(exception);
}
}
}
}