in c3r-cli-spark/src/main/java/com/amazonaws/c3r/spark/action/SparkMarshaller.java [198:233]
static void validateDuplicates(final ClientSettings clientSettings, final Dataset<Row> rawInputData,
final List<ColumnInsight> columnInsights) {
if (clientSettings.isAllowDuplicates()) {
return;
}
// Check for duplicates when `allowDuplicates` is false
final String[] fingerprintColumns = columnInsights.stream()
.filter(columnSchema -> columnSchema.getType() == ColumnType.FINGERPRINT) // enforced on fingerprint columns only
.map(ColumnSchema::getSourceHeader)
.map(ColumnHeader::toString)
.distinct()
.toArray(String[]::new);
// Check for duplicate non-null values
for (String col : fingerprintColumns) {
final Dataset<Row> filteredData = rawInputData.groupBy(col).count().filter("count > 1");
if (!filteredData.isEmpty()) {
throw new C3rRuntimeException("Duplicates were found in column `" + col + "`, but `allowDuplicates` is false.");
}
}
// Check for duplicate null values when `preserveNulls` is false
if (!clientSettings.isPreserveNulls()) {
for (String col : fingerprintColumns) {
final Column column = new Column(col);
final Dataset<Row> filteredData = rawInputData.select(column)
.groupBy(column)
.count()
.filter(column.isNull())
.filter("count > 1");
if (!filteredData.isEmpty()) {
throw new C3rRuntimeException("Duplicates NULLs were found in column `" + col + "`, but `allowDuplicates` and " +
"`preserveNulls` are false.");
}
}
}
}