in server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java [204:270]
private void drainImportQueue(ImportQueue queue)
{
int successCount = 0, failureCount = 0;
InstanceMetrics instanceMetrics = null;
AbstractMap.SimpleEntry<Promise<Void>, ImportOptions> pair;
while ((pair = queue.poll()) != null)
{
LOGGER.info("Starting SSTable import session");
Promise<Void> promise = pair.getKey();
ImportOptions options = pair.getValue();
InstanceMetadata instance = metadataFetcher.instance(options.host);
if (instanceMetrics == null)
{
instanceMetrics = instance.metrics();
}
try
{
TableOperations tableOperations = instance.delegate().tableOperations();
long startTime = System.nanoTime();
List<String> failedDirectories =
tableOperations.importNewSSTables(options.keyspace,
options.tableName,
options.directory,
options.resetLevel,
options.clearRepaired,
options.verifySSTables,
options.verifyTokens,
options.invalidateCaches,
options.extendedVerify,
options.copyData);
long serviceTimeNanos = System.nanoTime() - startTime;
if (!failedDirectories.isEmpty())
{
failureCount++;
LOGGER.error("Failed to import SSTables with options={}, serviceTimeMillis={}, " +
"failedDirectories={}", options, TimeUnit.NANOSECONDS.toMillis(serviceTimeNanos),
failedDirectories);
// TODO: HttpException should not be thrown by importer, as it is not at the transport layer
promise.fail(new HttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
"Failed to import from directories: " + failedDirectories));
}
else
{
successCount++;
LOGGER.info("Successfully imported SSTables with options={}, serviceTimeMillis={}",
options, TimeUnit.NANOSECONDS.toMillis(serviceTimeNanos));
promise.complete();
cleanup(options);
}
}
catch (Exception exception)
{
failureCount++;
LOGGER.error("Failed to import SSTables with options={}", options, exception);
promise.fail(exception);
}
}
if (successCount > 0 || failureCount > 0)
{
LOGGER.info("Finished SSTable import session with successCount={}, failureCount={}",
successCount, failureCount);
instanceMetrics.sstableImport().successfulImports.metric.update(successCount);
instanceMetrics.sstableImport().failedImports.metric.update(failureCount);
}
}