private void drainImportQueue()

in server/src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java [204:270]


    private void drainImportQueue(ImportQueue queue)
    {
        int successCount = 0, failureCount = 0;
        InstanceMetrics instanceMetrics = null;
        AbstractMap.SimpleEntry<Promise<Void>, ImportOptions> pair;
        while ((pair = queue.poll()) != null)
        {
            LOGGER.info("Starting SSTable import session");
            Promise<Void> promise = pair.getKey();
            ImportOptions options = pair.getValue();

            InstanceMetadata instance = metadataFetcher.instance(options.host);
            if (instanceMetrics == null)
            {
                instanceMetrics = instance.metrics();
            }
            try
            {
                TableOperations tableOperations = instance.delegate().tableOperations();
                long startTime = System.nanoTime();
                List<String> failedDirectories =
                tableOperations.importNewSSTables(options.keyspace,
                                                  options.tableName,
                                                  options.directory,
                                                  options.resetLevel,
                                                  options.clearRepaired,
                                                  options.verifySSTables,
                                                  options.verifyTokens,
                                                  options.invalidateCaches,
                                                  options.extendedVerify,
                                                  options.copyData);
                long serviceTimeNanos = System.nanoTime() - startTime;
                if (!failedDirectories.isEmpty())
                {
                    failureCount++;
                    LOGGER.error("Failed to import SSTables with options={}, serviceTimeMillis={}, " +
                                 "failedDirectories={}", options, TimeUnit.NANOSECONDS.toMillis(serviceTimeNanos),
                                 failedDirectories);
                    // TODO: HttpException should not be thrown by importer, as it is not at the transport layer
                    promise.fail(new HttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
                                                   "Failed to import from directories: " + failedDirectories));
                }
                else
                {
                    successCount++;
                    LOGGER.info("Successfully imported SSTables with options={}, serviceTimeMillis={}",
                                options, TimeUnit.NANOSECONDS.toMillis(serviceTimeNanos));
                    promise.complete();
                    cleanup(options);
                }
            }
            catch (Exception exception)
            {
                failureCount++;
                LOGGER.error("Failed to import SSTables with options={}", options, exception);
                promise.fail(exception);
            }
        }

        if (successCount > 0 || failureCount > 0)
        {
            LOGGER.info("Finished SSTable import session with successCount={}, failureCount={}",
                        successCount, failureCount);
            instanceMetrics.sstableImport().successfulImports.metric.update(successCount);
            instanceMetrics.sstableImport().failedImports.metric.update(failureCount);
        }
    }