in spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java [432:469]
public void finalizeJob(UUID jobId, Map<KeyspaceTablePair, RangeStats> results) throws Exception {
logger.info("Finalizing job status");
markNotRunning(jobId);
for (Map.Entry<KeyspaceTablePair, RangeStats> result : results.entrySet()) {
KeyspaceTablePair table = result.getKey();
RangeStats stats = result.getValue();
Statement jobResultUpdateStatement =
new SimpleStatement(String.format("INSERT INTO %s.%s (" +
" job_id," +
" qualified_table_name," +
" matched_partitions," +
" mismatched_partitions," +
" partitions_only_in_source," +
" partitions_only_in_target," +
" matched_rows," +
" matched_values," +
" mismatched_values," +
" skipped_partitions) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
metadataKeyspace, Schema.JOB_RESULTS),
jobId,
table.toCqlValueString(),
stats.getMatchedPartitions(),
stats.getMismatchedPartitions(),
stats.getOnlyInSource(),
stats.getOnlyInTarget(),
stats.getMatchedRows(),
stats.getMatchedValues(),
stats.getMismatchedValues(),
stats.getSkippedPartitions());
jobResultUpdateStatement.setIdempotent(true);
// also retry with NoHostAvailableException
retryStrategyProvider.get().retryIfNot(() -> session.execute(jobResultUpdateStatement),
QueryValidationException.class);
}
}