in spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java [359:430]
public void initializeJob(DiffJob.Params params,
String sourceClusterName,
String sourceClusterDesc,
String targetClusterName,
String targetClusterDesc) throws Exception {
logger.info("Initializing job status");
// The job was previously run, so this could be a re-run to
// mop up any failed splits so mark it in progress.
ResultSet rs = session.execute(String.format("INSERT INTO %s.%s (job_id) VALUES (?) IF NOT EXISTS",
metadataKeyspace, Schema.RUNNING_JOBS),
params.jobId);
if (!rs.one().getBool("[applied]")) {
logger.info("Could not mark job as running. " +
"Did a previous run of job id {} fail non-gracefully?",
params.jobId);
throw new RuntimeException("Unable to mark job running, aborting");
}
UUID timeUUID = UUIDs.timeBased();
DateTime startDateTime = new DateTime(UUIDs.unixTimestamp(timeUUID), DateTimeZone.UTC);
Statement initJobStatusStatement =
new SimpleStatement(String.format("INSERT INTO %s.%s (" +
" job_id," +
" job_start_time," +
" buckets," +
" qualified_table_names," +
" source_cluster_name," +
" source_cluster_desc," +
" target_cluster_name," +
" target_cluster_desc," +
" total_tasks)" +
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)" +
" IF NOT EXISTS",
metadataKeyspace, Schema.JOB_SUMMARY),
params.jobId,
timeUUID,
params.buckets,
params.keyspaceTables.stream().map(KeyspaceTablePair::toCqlValueString).collect(Collectors.toList()),
sourceClusterName,
sourceClusterDesc,
targetClusterName,
targetClusterDesc,
params.tasks);
initJobStatusStatement.setIdempotent(true);
rs = retryStrategyProvider.get().retryIfNot(() -> session.execute(initJobStatusStatement),
NoHostAvailableException.class,
QueryValidationException.class);
// This is a brand new job, index its details including start time
if (rs.one().getBool("[applied]")) {
BatchStatement batch = new BatchStatement();
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (source_cluster_name, job_id) VALUES (?, ?)",
metadataKeyspace, Schema.SOURCE_CLUSTER_INDEX),
sourceClusterName, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (target_cluster_name, job_id) VALUES (?, ?)",
metadataKeyspace, Schema.TARGET_CLUSTER_INDEX),
targetClusterName, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (keyspace_name, job_id) VALUES (?, ?)",
metadataKeyspace, Schema.KEYSPACE_INDEX),
metadataKeyspace, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (job_start_date, job_start_hour, job_start_time, job_id) " +
"VALUES ('%s', ?, ?, ?)",
metadataKeyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
startDateTime.getHourOfDay(), timeUUID, params.jobId));
batch.setIdempotent(true);
retryStrategyProvider.get().retryIfNot(() -> session.execute(batch),
NoHostAvailableException.class,
QueryValidationException.class);
}
}