in api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java [68:164]
public DBService(JobConfiguration config) {
logger.info("Initializing DBService");
ClusterProvider provider = ClusterProvider.getProvider(config.clusterConfig("metadata"), "metadata");
cluster = provider.getCluster();
session = cluster.connect();
diffKeyspace = config.metadataOptions().keyspace;
runningJobsStatement = session.prepare(String.format(
" SELECT job_id " +
" FROM %s.running_jobs", diffKeyspace));
jobSummaryStatement = session.prepare(String.format(
" SELECT " +
" job_id," +
" job_start_time," +
" buckets," +
" qualified_table_names, " +
" source_cluster_name," +
" source_cluster_desc," +
" target_cluster_name," +
" target_cluster_desc," +
" total_tasks " +
" FROM %s.job_summary" +
" WHERE job_id = ?", diffKeyspace));
jobResultStatement = session.prepare(String.format(
" SELECT " +
" job_id," +
" qualified_table_name, " +
" matched_partitions," +
" mismatched_partitions," +
" matched_rows," +
" matched_values," +
" mismatched_values," +
" partitions_only_in_source," +
" partitions_only_in_target," +
" skipped_partitions" +
" FROM %s.job_results" +
" WHERE job_id = ? AND qualified_table_name = ?", diffKeyspace));
jobStatusStatement = session.prepare(String.format(
" SELECT " +
" job_id," +
" bucket," +
" qualified_table_name," +
" completed " +
" FROM %s.job_status" +
" WHERE job_id = ? AND bucket = ?", diffKeyspace));
jobMismatchesStatement = session.prepare(String.format(
" SELECT " +
" job_id," +
" bucket," +
" qualified_table_name," +
" mismatching_token," +
" mismatch_type" +
" FROM %s.mismatches" +
" WHERE job_id = ? AND bucket = ?", diffKeyspace));
jobErrorSummaryStatement = session.prepare(String.format(
" SELECT " +
" count(start_token) AS error_count," +
" qualified_table_name" +
" FROM %s.task_errors" +
" WHERE job_id = ? AND bucket = ?",
diffKeyspace));
jobErrorRangesStatement = session.prepare(String.format(
" SELECT " +
" bucket," +
" qualified_table_name," +
" start_token," +
" end_token" +
" FROM %s.task_errors" +
" WHERE job_id = ? AND bucket = ?",
diffKeyspace));
jobErrorDetailStatement = session.prepare(String.format(
" SELECT " +
" qualified_table_name," +
" error_token" +
" FROM %s.partition_errors" +
" WHERE job_id = ? AND bucket = ? AND qualified_table_name = ? AND start_token = ? AND end_token = ?", diffKeyspace));
jobsStartDateStatement = session.prepare(String.format(
" SELECT " +
" job_id" +
" FROM %s.job_start_index" +
" WHERE job_start_date = ? AND job_start_hour = ?", diffKeyspace));
jobsForSourceStatement = session.prepare(String.format(
" SELECT " +
" job_id" +
" FROM %s.source_cluster_index" +
" WHERE source_cluster_name = ?", diffKeyspace));
jobsForTargetStatement = session.prepare(String.format(
" SELECT " +
" job_id" +
" FROM %s.target_cluster_index" +
" WHERE target_cluster_name = ?", diffKeyspace));
jobsForKeyspaceStatement = session.prepare(String.format(
" SELECT " +
" job_id" +
" FROM %s.keyspace_index" +
" WHERE keyspace_name = ?", diffKeyspace));
}