in integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java [82:147]
public AbstractCluster<I> initializeCluster(String versionString,
ClusterBuilderConfiguration configuration) throws IOException
{
// spin up a C* cluster using the in-jvm dtest
Versions versions = Versions.find();
Versions.Version requestedVersion = versions.getLatest(new Semver(versionString, Semver.SemverType.LOOSE));
int nodesPerDc = configuration.nodesPerDc;
int dcCount = configuration.dcCount;
int newNodesPerDc = configuration.newNodesPerDc;
Preconditions.checkArgument(newNodesPerDc >= 0, "newNodesPerDc cannot be a negative number");
int originalNodeCount = nodesPerDc * dcCount;
int finalNodeCount = dcCount * (nodesPerDc + newNodesPerDc);
UpgradeableCluster.Builder clusterBuilder = UpgradeableCluster.build(originalNodeCount);
clusterBuilder.withVersion(requestedVersion)
.withDynamicPortAllocation(configuration.dynamicPortAllocation) // to allow parallel test runs
.withSharedClasses(EXTRA.or(clusterBuilder.getSharedClasses()))
.withDCs(dcCount)
.withTokenCount(configuration.tokenCount)
.withDataDirCount(configuration.numDataDirsPerInstance);
TokenSupplier tokenSupplier;
Consumer<IInstanceConfig> instanceConfigUpdater;
if (configuration.partitioner != null)
{
tokenSupplier = TestTokenSupplier.evenlyDistributedTokens(Partitioner.fromClassName(configuration.partitioner),
nodesPerDc, newNodesPerDc, dcCount, 1);
instanceConfigUpdater = instanceConfig -> {
instanceConfig.set("partitioner", configuration.partitioner);
configuration.features.forEach(instanceConfig::with);
};
}
else
{
tokenSupplier = TestTokenSupplier.evenlyDistributedTokens(nodesPerDc, newNodesPerDc, dcCount, 1);
instanceConfigUpdater = config -> configuration.features.forEach(config::with);
}
if (configuration.additionalInstanceConfig != null)
{
instanceConfigUpdater = instanceConfigUpdater.andThen(config -> configuration.additionalInstanceConfig.forEach(config::set));
}
clusterBuilder.withTokenSupplier(tokenSupplier)
.withConfig(instanceConfigUpdater);
if (dcCount > 1)
{
clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount,
(nodeId) -> nodeId % 2 != 0 ?
dcAndRack("datacenter1", "rack1") :
dcAndRack("datacenter2", "rack2")));
}
if (configuration.instanceInitializer != null)
{
clusterBuilder.withInstanceInitializer(configuration.instanceInitializer);
}
UpgradeableCluster cluster = clusterBuilder.start();
if (cluster.size() > 1)
{
waitForHealthyRing(cluster);
fixDistributedSchemas((AbstractCluster<I>) cluster);
}
return (AbstractCluster<I>) cluster;
}