private int initBulkReader()

in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java [237:285]


    private int initBulkReader(@NotNull ClientConfig options,
                               CompletableFuture<NodeSettings> nodeSettingsFuture,
                               CompletableFuture<RingResponse> ringFuture) throws ExecutionException, InterruptedException
    {
        Preconditions.checkArgument(keyspace != null, "Keyspace must be non-null for Cassandra Bulk Reader");
        Preconditions.checkArgument(table != null, "Table must be non-null for Cassandra Bulk Reader");
        CompletableFuture<Map<String, PartitionedDataLayer.AvailabilityHint>> snapshotFuture;
        if (options.createSnapshot())
        {
            // Use create snapshot request to capture instance availability hint
            LOGGER.info("Creating snapshot snapshotName={} keyspace={} table={} dc={}",
                        snapshotName, keyspace, table, datacenter);
            snapshotFuture = ringFuture.thenCompose(this::createSnapshot);
        }
        else
        {
            snapshotFuture = CompletableFuture.completedFuture(new HashMap<>());
        }
        ShutdownHookManager.addShutdownHook(org.apache.spark.util.ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY(),
                                            ScalaFunctions.wrapLambda(() -> shutdownHook(options)));

        CompletableFuture<SchemaResponse> schemaFuture = sidecar.schema(keyspace);
        NodeSettings nodeSettings = nodeSettingsFuture.get();

        String cassandraVersion = getEffectiveCassandraVersionForRead(clusterConfig, nodeSettings);

        Partitioner partitioner = Partitioner.from(nodeSettings.partitioner());
        bridge = CassandraBridgeFactory.get(cassandraVersion);
        availabilityHints = snapshotFuture.get();

        String fullSchema = schemaFuture.get().schema();
        String createStmt = CqlUtils.extractTableSchema(fullSchema, keyspace, table);
        int indexCount = CqlUtils.extractIndexCount(fullSchema, keyspace, table);
        Set<String> udts = CqlUtils.extractUdts(fullSchema, keyspace);
        ReplicationFactor replicationFactor = CqlUtils.extractReplicationFactor(fullSchema, keyspace);
        rfMap = ImmutableMap.of(keyspace, replicationFactor);
        CompletableFuture<Integer> sizingFuture = CompletableFuture.supplyAsync(
        () -> getSizing(clusterConfig, replicationFactor, options).getEffectiveNumberOfCores(),
        ExecutorHolder.EXECUTOR_SERVICE);
        validateReplicationFactor(replicationFactor);
        udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt));

        cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexCount);
        CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get());

        int effectiveNumberOfCores = sizingFuture.get();
        tokenPartitioner = new TokenPartitioner(ring, options.defaultParallelism, effectiveNumberOfCores);
        return effectiveNumberOfCores;
    }