in spark-job/src/main/java/org/apache/cassandra/diff/Differ.java [88:148]
public Differ(JobConfiguration config,
DiffJob.Params params,
int perExecutorRateLimit,
DiffJob.Split split,
TokenHelper tokenHelper,
ClusterProvider sourceProvider,
ClusterProvider targetProvider,
ClusterProvider metadataProvider,
DiffJob.TrackerProvider trackerProvider,
RetryStrategyProvider retryStrategyProvider)
{
logger.info("Creating Differ for {}", split);
this.jobId = params.jobId;
this.split = split;
this.tokenHelper = tokenHelper;
this.keyspaceTables = params.keyspaceTables;
this.trackerProvider = trackerProvider;
rateLimiter = RateLimiter.create(perExecutorRateLimit);
this.reverseReadProbability = config.reverseReadProbability();
this.specificTokens = config.specificTokens();
this.retryStrategyProvider = retryStrategyProvider;
this.partitionSamplingProbability = config.partitionSamplingProbability();
synchronized (Differ.class)
{
/*
Spark runs jobs on each worker in the same JVM, we need to initialize these only once, otherwise
we run OOM with health checker threads
*/
// yes we could have JobConfiguration return this directly, but snakeyaml doesn't like relocated classes and the driver has to be shaded
ConsistencyLevel cl = ConsistencyLevel.valueOf(config.consistencyLevel());
if (srcDiffCluster == null)
{
srcDiffCluster = new DiffCluster(DiffCluster.Type.SOURCE,
sourceProvider.getCluster(),
cl,
rateLimiter,
config.tokenScanFetchSize(),
config.partitionReadFetchSize(),
config.readTimeoutMillis(),
retryStrategyProvider);
}
if (targetDiffCluster == null)
{
targetDiffCluster = new DiffCluster(DiffCluster.Type.TARGET,
targetProvider.getCluster(),
cl,
rateLimiter,
config.tokenScanFetchSize(),
config.partitionReadFetchSize(),
config.readTimeoutMillis(),
retryStrategyProvider);
}
if (journalSession == null)
{
journalSession = metadataProvider.getCluster().connect();
trackerProvider.initializeStatements(journalSession);
}
}
}