public void initializeJob()

in spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java [359:430]


        public void initializeJob(DiffJob.Params params,
                                  String sourceClusterName,
                                  String sourceClusterDesc,
                                  String targetClusterName,
                                  String targetClusterDesc) throws Exception {

            logger.info("Initializing job status");
            // The job was previously run, so this could be a re-run to
            // mop up any failed splits so mark it in progress.
            ResultSet rs = session.execute(String.format("INSERT INTO %s.%s (job_id) VALUES (?) IF NOT EXISTS",
                                                         metadataKeyspace, Schema.RUNNING_JOBS),
                                           params.jobId);
            if (!rs.one().getBool("[applied]")) {
                logger.info("Could not mark job as running. " +
                            "Did a previous run of job id {} fail non-gracefully?",
                            params.jobId);
                throw new RuntimeException("Unable to mark job running, aborting");
            }

            UUID timeUUID = UUIDs.timeBased();
            DateTime startDateTime = new DateTime(UUIDs.unixTimestamp(timeUUID), DateTimeZone.UTC);

            Statement initJobStatusStatement =
                new SimpleStatement(String.format("INSERT INTO %s.%s (" +
                                                  " job_id," +
                                                  " job_start_time," +
                                                  " buckets," +
                                                  " qualified_table_names," +
                                                  " source_cluster_name," +
                                                  " source_cluster_desc," +
                                                  " target_cluster_name," +
                                                  " target_cluster_desc," +
                                                  " total_tasks)" +
                                                  " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" +
                                                  " IF NOT EXISTS",
                                                  metadataKeyspace, Schema.JOB_SUMMARY),
                                    params.jobId,
                                    timeUUID,
                                    params.buckets,
                                    params.keyspaceTables.stream().map(KeyspaceTablePair::toCqlValueString).collect(Collectors.toList()),
                                    sourceClusterName,
                                    sourceClusterDesc,
                                    targetClusterName,
                                    targetClusterDesc,
                                    params.tasks);
            initJobStatusStatement.setIdempotent(true);
            rs = retryStrategyProvider.get().retryIfNot(() -> session.execute(initJobStatusStatement),
                                                        NoHostAvailableException.class,
                                                        QueryValidationException.class);

            // This is a brand new job, index its details including start time
            if (rs.one().getBool("[applied]")) {
                BatchStatement batch = new BatchStatement();
                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (source_cluster_name, job_id) VALUES (?, ?)",
                                                            metadataKeyspace, Schema.SOURCE_CLUSTER_INDEX),
                                              sourceClusterName, params.jobId));
                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (target_cluster_name, job_id) VALUES (?, ?)",
                                                            metadataKeyspace, Schema.TARGET_CLUSTER_INDEX),
                                              targetClusterName, params.jobId));
                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (keyspace_name, job_id) VALUES (?, ?)",
                                                            metadataKeyspace, Schema.KEYSPACE_INDEX),
                                              metadataKeyspace, params.jobId));
                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (job_start_date, job_start_hour, job_start_time, job_id) " +
                                                            "VALUES ('%s', ?, ?, ?)",
                                                            metadataKeyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
                                              startDateTime.getHourOfDay(), timeUUID, params.jobId));
                batch.setIdempotent(true);
                retryStrategyProvider.get().retryIfNot(() -> session.execute(batch),
                                                       NoHostAvailableException.class,
                                                       QueryValidationException.class);
            }
        }