in c3r-cli-spark/src/main/java/com/amazonaws/c3r/spark/io/csv/SparkCsvReader.java [80:113]
private static Dataset<Row> readFiles(@NonNull final SparkSession sparkSession,
@NonNull final String source,
final Map<String, String> options) {
final File sourceFile = Path.of(source).toFile();
if (sourceFile.isFile()) {
return readFile(sparkSession, source, options);
}
final File[] files = sourceFile.listFiles();
if (files == null) {
throw new C3rRuntimeException("Source could not be read at path " + sourceFile + ".");
}
Dataset<Row> dataset = null;
Set<String> columns = null;
for (File file : files) {
if (file.isDirectory()) {
continue; // Skip directories. Recursion not currently supported.
}
if (dataset == null) {
dataset = readFile(sparkSession, file.getAbsolutePath(), options);
columns = Set.of(dataset.columns());
} else {
final Dataset<Row> nextDataset = readFile(sparkSession, file.getAbsolutePath(), options);
final Set<String> nextDatasetColumns = Set.of(nextDataset.columns());
if (columns.size() != nextDatasetColumns.size() || !columns.containsAll(nextDatasetColumns)) {
// unionAll will merge data based on column position without further enforcement of schemas.
throw new C3rRuntimeException("Found mismatched columns between "
+ files[0].getAbsolutePath() + " and " + file.getAbsolutePath() + ".");
}
// We must use unionAll and not union because union filters on distinct rows.
dataset = dataset.unionAll(nextDataset);
}
}
return dataset;
}