in spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java [622:652]
public static void maybeInitialize(Session session, MetadataKeyspaceOptions options, RetryStrategyProvider retryStrategyProvider) {
if (!options.should_init)
return;
Consumer<String> retryQuery = query -> {
try {
retryStrategyProvider.get().retryIfNot(() -> session.execute(query),
NoHostAvailableException.class,
QueryValidationException.class);
}
catch (Exception exception) {
Throwables.propagate(exception);
}
};
logger.info("Initializing cassandradiff journal schema in \"{}\" keyspace", options.keyspace);
retryQuery.accept(String.format(KEYSPACE_SCHEMA, options.keyspace, options.replication));
retryQuery.accept(String.format(JOB_SUMMARY_SCHEMA, options.keyspace, JOB_SUMMARY, options.ttl));
retryQuery.accept(String.format(JOB_STATUS_SCHEMA, options.keyspace, JOB_STATUS));
retryQuery.accept(String.format(JOB_RESULTS_SCHEMA, options.keyspace, JOB_RESULTS, options.ttl));
retryQuery.accept(String.format(TASK_STATUS_SCHEMA, options.keyspace, TASK_STATUS, options.ttl));
retryQuery.accept(String.format(MISMATCHES_SCHEMA, options.keyspace, MISMATCHES, options.ttl));
retryQuery.accept(String.format(ERROR_SUMMARY_SCHEMA, options.keyspace, ERROR_SUMMARY, options.ttl));
retryQuery.accept(String.format(ERROR_DETAIL_SCHEMA, options.keyspace, ERROR_DETAIL, options.ttl));
retryQuery.accept(String.format(SOURCE_CLUSTER_INDEX_SCHEMA, options.keyspace, SOURCE_CLUSTER_INDEX, options.ttl));
retryQuery.accept(String.format(TARGET_CLUSTER_INDEX_SCHEMA, options.keyspace, TARGET_CLUSTER_INDEX, options.ttl));
retryQuery.accept(String.format(KEYSPACE_INDEX_SCHEMA, options.keyspace, KEYSPACE_INDEX, options.ttl));
retryQuery.accept(String.format(JOB_START_INDEX_SCHEMA, options.keyspace, JOB_START_INDEX, options.ttl));
retryQuery.accept(String.format(RUNNING_JOBS_SCHEMA, options.keyspace, RUNNING_JOBS, options.ttl));
logger.info("Schema initialized");
}