public static void initializeStatements()

in spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java [104:166]


        public static void initializeStatements(Session session, String metadataKeyspace) {
            if (updateStmt == null) {
                updateStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
                                                           " job_id," +
                                                           " bucket," +
                                                           " qualified_table_name," +
                                                           " start_token," +
                                                           " end_token," +
                                                           " matched_partitions," +
                                                           " mismatched_partitions," +
                                                           " partitions_only_in_source," +
                                                           " partitions_only_in_target," +
                                                           " matched_rows," +
                                                           " matched_values," +
                                                           " mismatched_values," +
                                                           " skipped_partitions," +
                                                           " last_token )" +
                                                           "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                                                           metadataKeyspace, Schema.TASK_STATUS));
            }
            if (mismatchStmt == null) {
                mismatchStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
                                                             " job_id," +
                                                             " bucket," +
                                                             " qualified_table_name," +
                                                             " mismatching_token," +
                                                             " mismatch_type )" +
                                                             "VALUES (?, ?, ?, ?, ?)",
                                                             metadataKeyspace, Schema.MISMATCHES));
            }
            if (updateCompleteStmt == null) {
                updateCompleteStmt = session.prepare(String.format("UPDATE %s.%s " +
                                                                   " SET completed = completed + 1" +
                                                                   " WHERE job_id = ? " +
                                                                   " AND bucket = ? " +
                                                                   " AND qualified_table_name = ? ",
                                                                   metadataKeyspace, Schema.JOB_STATUS))
                                            .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
            }
            if (errorSummaryStmt == null) {
                errorSummaryStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
                                                                 " job_id," +
                                                                 " bucket," +
                                                                 " qualified_table_name," +
                                                                 " start_token," +
                                                                 " end_token)" +
                                                                 " VALUES (?, ?, ?, ?, ?)",
                                                                 metadataKeyspace, Schema.ERROR_SUMMARY));
            }
            if (errorDetailStmt == null) {
                errorDetailStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
                                                                " job_id," +
                                                                " bucket," +
                                                                " qualified_table_name," +
                                                                " start_token," +
                                                                " end_token," +
                                                                " error_token," +
                                                                " error_source)" +
                                                                " VALUES (?, ?, ?, ?, ?, ?, ?)",
                                                                metadataKeyspace, Schema.ERROR_DETAIL));
            }

        }