public void run()

in spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java [86:211]


    public void run(JobConfiguration configuration, JavaSparkContext sc) {
        SparkConf conf = sc.getConf();
        // get partitioner from both clusters and verify that they match
        ClusterProvider sourceProvider = ClusterProvider.getProvider(configuration.clusterConfig("source"), "source");
        ClusterProvider targetProvider = ClusterProvider.getProvider(configuration.clusterConfig("target"), "target");
        String sourcePartitioner;
        String targetPartitioner;
        List<KeyspaceTablePair> tablesToCompare = configuration.filteredKeyspaceTables();
        try (Cluster sourceCluster = sourceProvider.getCluster();
             Cluster targetCluster = targetProvider.getCluster()) {
            sourcePartitioner = sourceCluster.getMetadata().getPartitioner();
            targetPartitioner = targetCluster.getMetadata().getPartitioner();

            if (!sourcePartitioner.equals(targetPartitioner)) {
                throw new IllegalStateException(String.format("Cluster partitioners do not match; Source: %s, Target: %s,",
                                                              sourcePartitioner, targetPartitioner));
            }

            if (configuration.shouldAutoDiscoverTables()) {
                Schema sourceSchema = new Schema(sourceCluster.getMetadata(), configuration);
                Schema targetSchema = new Schema(targetCluster.getMetadata(), configuration);
                Schema commonSchema = sourceSchema.intersect(targetSchema);
                if (commonSchema.size() != sourceSchema.size()) {
                    Pair<Set<KeyspaceTablePair>, Set<KeyspaceTablePair>> difference = Schema.difference(sourceSchema, targetSchema);
                    logger.warn("Found tables that only exist in either source or target cluster. Ignoring those tables for comparision. " +
                                "Distinct tables in source cluster: {}. " +
                                "Distinct tables in target cluster: {}",
                                difference.getLeft(), difference.getRight());
                }
                tablesToCompare = commonSchema.toQualifiedTableList();
            }
        }

        TokenHelper tokenHelper = TokenHelper.forPartitioner(sourcePartitioner);

        logger.info("Configuring job metadata store");
        ClusterProvider metadataProvider = ClusterProvider.getProvider(configuration.clusterConfig("metadata"), "metadata");
        JobMetadataDb.JobLifeCycle job = null;
        UUID jobId = null;
        Cluster metadataCluster = null;
        Session metadataSession = null;

        try {
            metadataCluster = metadataProvider.getCluster();
            metadataSession = metadataCluster.connect();
            RetryStrategyProvider retryStrategyProvider = RetryStrategyProvider.create(configuration.retryOptions());
            MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions();
            JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions, retryStrategyProvider);

            // Job params, which once a job is created cannot be modified in subsequent re-runs
            logger.info("Creating or retrieving job parameters");
            job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace, retryStrategyProvider);
            Params params = getJobParams(job, configuration, tablesToCompare);
            logger.info("Job Params: {}", params);
            if (null == params)
                throw new RuntimeException("Unable to initialize job params");

            jobId = params.jobId;
            List<Split> splits = getSplits(configuration, TokenHelper.forPartitioner(sourcePartitioner));

            // Job options, which may be modified per-run
            int instances = Integer.parseInt(conf.get("spark.executor.instances", "4"));
            int cores = Integer.parseInt(conf.get("spark.executor.cores", "2"));
            int executors = instances * cores;
            // according to https://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections we should
            // have 2-4 partitions per cpu in the cluster:
            int slices = Math.min(4 * executors, splits.size());
            int perExecutorRateLimit = configuration.rateLimit() / executors;

            // Record the high level job summary info
            job.initializeJob(params,
                              sourceProvider.getClusterName(),
                              sourceProvider.toString(),
                              targetProvider.getClusterName(),
                              targetProvider.toString());

            logger.info("DiffJob {} comparing [{}] on {} and {}",
                        jobId,
                        params.keyspaceTables.stream().map(KeyspaceTablePair::toString).collect(Collectors.joining(",")),
                        sourceProvider,
                        targetProvider);

            if (null != preJobHook)
                preJobHook.run();

            // Run the distributed diff and collate results
            Map<KeyspaceTablePair, RangeStats> diffStats = sc.parallelize(splits, slices)
                                                             .map((split) -> new Differ(configuration,
                                                                                        params,
                                                                                        perExecutorRateLimit,
                                                                                        split,
                                                                                        tokenHelper,
                                                                                        sourceProvider,
                                                                                        targetProvider,
                                                                                        metadataProvider,
                                                                                        new TrackerProvider(configuration.metadataOptions().keyspace),
                                                                                        retryStrategyProvider)
                                                                                 .run())
                                                             .reduce(Differ::accumulate);
            // Publish results. This also removes the job from the currently running list
            job.finalizeJob(params.jobId, diffStats);
            logger.info("FINISHED: {}", diffStats);
            if (null != postJobHook)
                postJobHook.accept(diffStats);
        } catch (Exception e) {
            // If the job errors out, try and mark the job as not running, so it can be restarted.
            // If the error was thrown from JobMetadataDb.finalizeJob *after* the job had already
            // been marked not running, this will log a warning, but is not fatal.
            if (job != null && jobId != null)
                job.markNotRunning(jobId);
            throw new RuntimeException("Diff job failed", e);
        } finally {
            if (sc.isLocal())
            {
                Differ.shutdown();
                JobMetadataDb.ProgressTracker.resetStatements();
            }
            if (metadataCluster != null) {
                metadataCluster.close();
            }
            if (metadataSession != null) {
                metadataSession.close();
            }

        }
    }