private void drainImportQueue()

in src/main/java/org/apache/cassandra/sidecar/utils/SSTableImporter.java [197:246]


    private void drainImportQueue(ImportQueue queue)
    {
        while (!queue.isEmpty())
        {
            Pair<Promise<Void>, ImportOptions> pair = queue.poll();
            Promise<Void> promise = pair.getLeft();
            ImportOptions options = pair.getRight();

            CassandraAdapterDelegate cassandra = metadataFetcher.delegate(options.host);
            TableOperations tableOperations = cassandra.tableOperations();

            if (tableOperations == null)
            {
                promise.fail(new HttpException(HttpResponseStatus.SERVICE_UNAVAILABLE.code(),
                                               "Cassandra service is unavailable"));
            }
            else
            {
                try
                {
                    List<String> failedDirectories =
                    tableOperations.importNewSSTables(options.keyspace,
                                                      options.tableName,
                                                      options.directory,
                                                      options.resetLevel,
                                                      options.clearRepaired,
                                                      options.verifySSTables,
                                                      options.verifyTokens,
                                                      options.invalidateCaches,
                                                      options.extendedVerify,
                                                      options.copyData);
                    if (!failedDirectories.isEmpty())
                    {
                        promise.fail(new HttpException(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
                                                       "Failed to import from directories: " + failedDirectories));
                    }
                    else
                    {
                        promise.complete();
                        cleanup(options);
                    }
                }
                catch (Exception exception)
                {
                    LOGGER.error("Failed to import SSTables with options={}", options, exception);
                    promise.fail(exception);
                }
            }
        }
    }