in spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java [104:166]
public static void initializeStatements(Session session, String metadataKeyspace) {
if (updateStmt == null) {
updateStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
" qualified_table_name," +
" start_token," +
" end_token," +
" matched_partitions," +
" mismatched_partitions," +
" partitions_only_in_source," +
" partitions_only_in_target," +
" matched_rows," +
" matched_values," +
" mismatched_values," +
" skipped_partitions," +
" last_token )" +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
metadataKeyspace, Schema.TASK_STATUS));
}
if (mismatchStmt == null) {
mismatchStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
" qualified_table_name," +
" mismatching_token," +
" mismatch_type )" +
"VALUES (?, ?, ?, ?, ?)",
metadataKeyspace, Schema.MISMATCHES));
}
if (updateCompleteStmt == null) {
updateCompleteStmt = session.prepare(String.format("UPDATE %s.%s " +
" SET completed = completed + 1" +
" WHERE job_id = ? " +
" AND bucket = ? " +
" AND qualified_table_name = ? ",
metadataKeyspace, Schema.JOB_STATUS))
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
}
if (errorSummaryStmt == null) {
errorSummaryStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
" qualified_table_name," +
" start_token," +
" end_token)" +
" VALUES (?, ?, ?, ?, ?)",
metadataKeyspace, Schema.ERROR_SUMMARY));
}
if (errorDetailStmt == null) {
errorDetailStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
" job_id," +
" bucket," +
" qualified_table_name," +
" start_token," +
" end_token," +
" error_token," +
" error_source)" +
" VALUES (?, ?, ?, ?, ?, ?, ?)",
metadataKeyspace, Schema.ERROR_DETAIL));
}
}