public Differ()

in spark-job/src/main/java/org/apache/cassandra/diff/Differ.java [88:148]


    public Differ(JobConfiguration config,
                  DiffJob.Params params,
                  int perExecutorRateLimit,
                  DiffJob.Split split,
                  TokenHelper tokenHelper,
                  ClusterProvider sourceProvider,
                  ClusterProvider targetProvider,
                  ClusterProvider metadataProvider,
                  DiffJob.TrackerProvider trackerProvider,
                  RetryStrategyProvider retryStrategyProvider)
    {
        logger.info("Creating Differ for {}", split);
        this.jobId = params.jobId;
        this.split = split;
        this.tokenHelper = tokenHelper;
        this.keyspaceTables = params.keyspaceTables;
        this.trackerProvider = trackerProvider;
        rateLimiter = RateLimiter.create(perExecutorRateLimit);
        this.reverseReadProbability = config.reverseReadProbability();
        this.specificTokens = config.specificTokens();
        this.retryStrategyProvider = retryStrategyProvider;
        this.partitionSamplingProbability = config.partitionSamplingProbability();
        synchronized (Differ.class)
        {
            /*
            Spark runs jobs on each worker in the same JVM, we need to initialize these only once, otherwise
            we run OOM with health checker threads
             */
            // yes we could have JobConfiguration return this directly, but snakeyaml doesn't like relocated classes and the driver has to be shaded
            ConsistencyLevel cl = ConsistencyLevel.valueOf(config.consistencyLevel());
            if (srcDiffCluster == null)
            {
                srcDiffCluster = new DiffCluster(DiffCluster.Type.SOURCE,
                                                 sourceProvider.getCluster(),
                                                 cl,
                                                 rateLimiter,
                                                 config.tokenScanFetchSize(),
                                                 config.partitionReadFetchSize(),
                                                 config.readTimeoutMillis(),
                                                 retryStrategyProvider);
            }

            if (targetDiffCluster == null)
            {
                targetDiffCluster = new DiffCluster(DiffCluster.Type.TARGET,
                                                    targetProvider.getCluster(),
                                                    cl,
                                                    rateLimiter,
                                                    config.tokenScanFetchSize(),
                                                    config.partitionReadFetchSize(),
                                                    config.readTimeoutMillis(),
                                                    retryStrategyProvider);
            }

            if (journalSession == null)
            {
                journalSession = metadataProvider.getCluster().connect();
                trackerProvider.initializeStatements(journalSession);
            }
        }
    }