in cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/data/CassandraDataLayer.java [237:285]
private int initBulkReader(@NotNull ClientConfig options,
CompletableFuture<NodeSettings> nodeSettingsFuture,
CompletableFuture<RingResponse> ringFuture) throws ExecutionException, InterruptedException
{
Preconditions.checkArgument(keyspace != null, "Keyspace must be non-null for Cassandra Bulk Reader");
Preconditions.checkArgument(table != null, "Table must be non-null for Cassandra Bulk Reader");
CompletableFuture<Map<String, PartitionedDataLayer.AvailabilityHint>> snapshotFuture;
if (options.createSnapshot())
{
// Use create snapshot request to capture instance availability hint
LOGGER.info("Creating snapshot snapshotName={} keyspace={} table={} dc={}",
snapshotName, keyspace, table, datacenter);
snapshotFuture = ringFuture.thenCompose(this::createSnapshot);
}
else
{
snapshotFuture = CompletableFuture.completedFuture(new HashMap<>());
}
ShutdownHookManager.addShutdownHook(org.apache.spark.util.ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY(),
ScalaFunctions.wrapLambda(() -> shutdownHook(options)));
CompletableFuture<SchemaResponse> schemaFuture = sidecar.schema(keyspace);
NodeSettings nodeSettings = nodeSettingsFuture.get();
String cassandraVersion = getEffectiveCassandraVersionForRead(clusterConfig, nodeSettings);
Partitioner partitioner = Partitioner.from(nodeSettings.partitioner());
bridge = CassandraBridgeFactory.get(cassandraVersion);
availabilityHints = snapshotFuture.get();
String fullSchema = schemaFuture.get().schema();
String createStmt = CqlUtils.extractTableSchema(fullSchema, keyspace, table);
int indexCount = CqlUtils.extractIndexCount(fullSchema, keyspace, table);
Set<String> udts = CqlUtils.extractUdts(fullSchema, keyspace);
ReplicationFactor replicationFactor = CqlUtils.extractReplicationFactor(fullSchema, keyspace);
rfMap = ImmutableMap.of(keyspace, replicationFactor);
CompletableFuture<Integer> sizingFuture = CompletableFuture.supplyAsync(
() -> getSizing(clusterConfig, replicationFactor, options).getEffectiveNumberOfCores(),
ExecutorHolder.EXECUTOR_SERVICE);
validateReplicationFactor(replicationFactor);
udts.forEach(udt -> LOGGER.info("Adding schema UDT: '{}'", udt));
cqlTable = bridge().buildSchema(createStmt, keyspace, replicationFactor, partitioner, udts, null, indexCount);
CassandraRing ring = createCassandraRingFromRing(partitioner, replicationFactor, ringFuture.get());
int effectiveNumberOfCores = sizingFuture.get();
tokenPartitioner = new TokenPartitioner(ring, options.defaultParallelism, effectiveNumberOfCores);
return effectiveNumberOfCores;
}