public void finalizeJob()

in spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java [432:469]


        public void finalizeJob(UUID jobId, Map<KeyspaceTablePair, RangeStats> results) throws Exception {
            logger.info("Finalizing job status");

            markNotRunning(jobId);

            for (Map.Entry<KeyspaceTablePair, RangeStats> result : results.entrySet()) {
                KeyspaceTablePair table = result.getKey();
                RangeStats stats = result.getValue();
                Statement jobResultUpdateStatement =
                    new SimpleStatement(String.format("INSERT INTO %s.%s (" +
                                                      "  job_id," +
                                                      "  qualified_table_name," +
                                                      "  matched_partitions," +
                                                      "  mismatched_partitions," +
                                                      "  partitions_only_in_source," +
                                                      "  partitions_only_in_target," +
                                                      "  matched_rows," +
                                                      "  matched_values," +
                                                      "  mismatched_values," +
                                                      "  skipped_partitions) " +
                                                      "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
                                                      metadataKeyspace, Schema.JOB_RESULTS),
                                        jobId,
                                        table.toCqlValueString(),
                                        stats.getMatchedPartitions(),
                                        stats.getMismatchedPartitions(),
                                        stats.getOnlyInSource(),
                                        stats.getOnlyInTarget(),
                                        stats.getMatchedRows(),
                                        stats.getMatchedValues(),
                                        stats.getMismatchedValues(),
                                        stats.getSkippedPartitions());
                jobResultUpdateStatement.setIdempotent(true);
                // also retry with NoHostAvailableException
                retryStrategyProvider.get().retryIfNot(() -> session.execute(jobResultUpdateStatement),
                                                       QueryValidationException.class);
            }
        }