public static void maybeInitialize()

in spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java [622:652]


        public static void maybeInitialize(Session session, MetadataKeyspaceOptions options, RetryStrategyProvider retryStrategyProvider) {
            if (!options.should_init)
                return;

            Consumer<String> retryQuery = query -> {
                try {
                    retryStrategyProvider.get().retryIfNot(() -> session.execute(query),
                                                           NoHostAvailableException.class,
                                                           QueryValidationException.class);
                }
                catch (Exception exception) {
                    Throwables.propagate(exception);
                }
            };

            logger.info("Initializing cassandradiff journal schema in \"{}\" keyspace", options.keyspace);
            retryQuery.accept(String.format(KEYSPACE_SCHEMA, options.keyspace, options.replication));
            retryQuery.accept(String.format(JOB_SUMMARY_SCHEMA, options.keyspace, JOB_SUMMARY, options.ttl));
            retryQuery.accept(String.format(JOB_STATUS_SCHEMA, options.keyspace, JOB_STATUS));
            retryQuery.accept(String.format(JOB_RESULTS_SCHEMA, options.keyspace, JOB_RESULTS, options.ttl));
            retryQuery.accept(String.format(TASK_STATUS_SCHEMA, options.keyspace, TASK_STATUS, options.ttl));
            retryQuery.accept(String.format(MISMATCHES_SCHEMA, options.keyspace, MISMATCHES, options.ttl));
            retryQuery.accept(String.format(ERROR_SUMMARY_SCHEMA, options.keyspace, ERROR_SUMMARY, options.ttl));
            retryQuery.accept(String.format(ERROR_DETAIL_SCHEMA, options.keyspace, ERROR_DETAIL, options.ttl));
            retryQuery.accept(String.format(SOURCE_CLUSTER_INDEX_SCHEMA, options.keyspace, SOURCE_CLUSTER_INDEX, options.ttl));
            retryQuery.accept(String.format(TARGET_CLUSTER_INDEX_SCHEMA, options.keyspace, TARGET_CLUSTER_INDEX, options.ttl));
            retryQuery.accept(String.format(KEYSPACE_INDEX_SCHEMA, options.keyspace, KEYSPACE_INDEX, options.ttl));
            retryQuery.accept(String.format(JOB_START_INDEX_SCHEMA, options.keyspace, JOB_START_INDEX, options.ttl));
            retryQuery.accept(String.format(RUNNING_JOBS_SCHEMA, options.keyspace, RUNNING_JOBS, options.ttl));
            logger.info("Schema initialized");
        }