in spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java [86:211]
public void run(JobConfiguration configuration, JavaSparkContext sc) {
SparkConf conf = sc.getConf();
// get partitioner from both clusters and verify that they match
ClusterProvider sourceProvider = ClusterProvider.getProvider(configuration.clusterConfig("source"), "source");
ClusterProvider targetProvider = ClusterProvider.getProvider(configuration.clusterConfig("target"), "target");
String sourcePartitioner;
String targetPartitioner;
List<KeyspaceTablePair> tablesToCompare = configuration.filteredKeyspaceTables();
try (Cluster sourceCluster = sourceProvider.getCluster();
Cluster targetCluster = targetProvider.getCluster()) {
sourcePartitioner = sourceCluster.getMetadata().getPartitioner();
targetPartitioner = targetCluster.getMetadata().getPartitioner();
if (!sourcePartitioner.equals(targetPartitioner)) {
throw new IllegalStateException(String.format("Cluster partitioners do not match; Source: %s, Target: %s,",
sourcePartitioner, targetPartitioner));
}
if (configuration.shouldAutoDiscoverTables()) {
Schema sourceSchema = new Schema(sourceCluster.getMetadata(), configuration);
Schema targetSchema = new Schema(targetCluster.getMetadata(), configuration);
Schema commonSchema = sourceSchema.intersect(targetSchema);
if (commonSchema.size() != sourceSchema.size()) {
Pair<Set<KeyspaceTablePair>, Set<KeyspaceTablePair>> difference = Schema.difference(sourceSchema, targetSchema);
logger.warn("Found tables that only exist in either source or target cluster. Ignoring those tables for comparision. " +
"Distinct tables in source cluster: {}. " +
"Distinct tables in target cluster: {}",
difference.getLeft(), difference.getRight());
}
tablesToCompare = commonSchema.toQualifiedTableList();
}
}
TokenHelper tokenHelper = TokenHelper.forPartitioner(sourcePartitioner);
logger.info("Configuring job metadata store");
ClusterProvider metadataProvider = ClusterProvider.getProvider(configuration.clusterConfig("metadata"), "metadata");
JobMetadataDb.JobLifeCycle job = null;
UUID jobId = null;
Cluster metadataCluster = null;
Session metadataSession = null;
try {
metadataCluster = metadataProvider.getCluster();
metadataSession = metadataCluster.connect();
RetryStrategyProvider retryStrategyProvider = RetryStrategyProvider.create(configuration.retryOptions());
MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions();
JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions, retryStrategyProvider);
// Job params, which once a job is created cannot be modified in subsequent re-runs
logger.info("Creating or retrieving job parameters");
job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace, retryStrategyProvider);
Params params = getJobParams(job, configuration, tablesToCompare);
logger.info("Job Params: {}", params);
if (null == params)
throw new RuntimeException("Unable to initialize job params");
jobId = params.jobId;
List<Split> splits = getSplits(configuration, TokenHelper.forPartitioner(sourcePartitioner));
// Job options, which may be modified per-run
int instances = Integer.parseInt(conf.get("spark.executor.instances", "4"));
int cores = Integer.parseInt(conf.get("spark.executor.cores", "2"));
int executors = instances * cores;
// according to https://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections we should
// have 2-4 partitions per cpu in the cluster:
int slices = Math.min(4 * executors, splits.size());
int perExecutorRateLimit = configuration.rateLimit() / executors;
// Record the high level job summary info
job.initializeJob(params,
sourceProvider.getClusterName(),
sourceProvider.toString(),
targetProvider.getClusterName(),
targetProvider.toString());
logger.info("DiffJob {} comparing [{}] on {} and {}",
jobId,
params.keyspaceTables.stream().map(KeyspaceTablePair::toString).collect(Collectors.joining(",")),
sourceProvider,
targetProvider);
if (null != preJobHook)
preJobHook.run();
// Run the distributed diff and collate results
Map<KeyspaceTablePair, RangeStats> diffStats = sc.parallelize(splits, slices)
.map((split) -> new Differ(configuration,
params,
perExecutorRateLimit,
split,
tokenHelper,
sourceProvider,
targetProvider,
metadataProvider,
new TrackerProvider(configuration.metadataOptions().keyspace),
retryStrategyProvider)
.run())
.reduce(Differ::accumulate);
// Publish results. This also removes the job from the currently running list
job.finalizeJob(params.jobId, diffStats);
logger.info("FINISHED: {}", diffStats);
if (null != postJobHook)
postJobHook.accept(diffStats);
} catch (Exception e) {
// If the job errors out, try and mark the job as not running, so it can be restarted.
// If the error was thrown from JobMetadataDb.finalizeJob *after* the job had already
// been marked not running, this will log a warning, but is not fatal.
if (job != null && jobId != null)
job.markNotRunning(jobId);
throw new RuntimeException("Diff job failed", e);
} finally {
if (sc.isLocal())
{
Differ.shutdown();
JobMetadataDb.ProgressTracker.resetStatements();
}
if (metadataCluster != null) {
metadataCluster.close();
}
if (metadataSession != null) {
metadataSession.close();
}
}
}