public AbstractCluster initializeCluster()

in integration-framework/src/main/java/org/apache/cassandra/distributed/impl/CassandraCluster.java [82:147]


    public AbstractCluster<I> initializeCluster(String versionString,
                                                ClusterBuilderConfiguration configuration) throws IOException
    {
        // spin up a C* cluster using the in-jvm dtest
        Versions versions = Versions.find();
        Versions.Version requestedVersion = versions.getLatest(new Semver(versionString, Semver.SemverType.LOOSE));

        int nodesPerDc = configuration.nodesPerDc;
        int dcCount = configuration.dcCount;
        int newNodesPerDc = configuration.newNodesPerDc;
        Preconditions.checkArgument(newNodesPerDc >= 0, "newNodesPerDc cannot be a negative number");
        int originalNodeCount = nodesPerDc * dcCount;
        int finalNodeCount = dcCount * (nodesPerDc + newNodesPerDc);

        UpgradeableCluster.Builder clusterBuilder = UpgradeableCluster.build(originalNodeCount);
        clusterBuilder.withVersion(requestedVersion)
                      .withDynamicPortAllocation(configuration.dynamicPortAllocation) // to allow parallel test runs
                      .withSharedClasses(EXTRA.or(clusterBuilder.getSharedClasses()))
                      .withDCs(dcCount)
                      .withTokenCount(configuration.tokenCount)
                      .withDataDirCount(configuration.numDataDirsPerInstance);

        TokenSupplier tokenSupplier;
        Consumer<IInstanceConfig> instanceConfigUpdater;
        if (configuration.partitioner != null)
        {
            tokenSupplier = TestTokenSupplier.evenlyDistributedTokens(Partitioner.fromClassName(configuration.partitioner),
                                                                      nodesPerDc, newNodesPerDc, dcCount, 1);
            instanceConfigUpdater = instanceConfig -> {
                instanceConfig.set("partitioner", configuration.partitioner);
                configuration.features.forEach(instanceConfig::with);
            };
        }
        else
        {
            tokenSupplier = TestTokenSupplier.evenlyDistributedTokens(nodesPerDc, newNodesPerDc, dcCount, 1);
            instanceConfigUpdater = config -> configuration.features.forEach(config::with);
        }
        if (configuration.additionalInstanceConfig != null)
        {
            instanceConfigUpdater = instanceConfigUpdater.andThen(config -> configuration.additionalInstanceConfig.forEach(config::set));
        }
        clusterBuilder.withTokenSupplier(tokenSupplier)
                      .withConfig(instanceConfigUpdater);

        if (dcCount > 1)
        {
            clusterBuilder.withNodeIdTopology(networkTopology(finalNodeCount,
                                                              (nodeId) -> nodeId % 2 != 0 ?
                                                                          dcAndRack("datacenter1", "rack1") :
                                                                          dcAndRack("datacenter2", "rack2")));
        }

        if (configuration.instanceInitializer != null)
        {
            clusterBuilder.withInstanceInitializer(configuration.instanceInitializer);
        }

        UpgradeableCluster cluster = clusterBuilder.start();
        if (cluster.size() > 1)
        {
            waitForHealthyRing(cluster);
            fixDistributedSchemas((AbstractCluster<I>) cluster);
        }
        return (AbstractCluster<I>) cluster;
    }