private static Dataset readFiles()

in c3r-cli-spark/src/main/java/com/amazonaws/c3r/spark/io/csv/SparkCsvReader.java [80:113]


    private static Dataset<Row> readFiles(@NonNull final SparkSession sparkSession,
                                          @NonNull final String source,
                                          final Map<String, String> options) {
        final File sourceFile = Path.of(source).toFile();
        if (sourceFile.isFile()) {
            return readFile(sparkSession, source, options);
        }
        final File[] files = sourceFile.listFiles();
        if (files == null) {
            throw new C3rRuntimeException("Source could not be read at path " + sourceFile + ".");
        }
        Dataset<Row> dataset = null;
        Set<String> columns = null;
        for (File file : files) {
            if (file.isDirectory()) {
                continue; // Skip directories. Recursion not currently supported.
            }
            if (dataset == null) {
                dataset = readFile(sparkSession, file.getAbsolutePath(), options);
                columns = Set.of(dataset.columns());
            } else {
                final Dataset<Row> nextDataset = readFile(sparkSession, file.getAbsolutePath(), options);
                final Set<String> nextDatasetColumns = Set.of(nextDataset.columns());
                if (columns.size() != nextDatasetColumns.size() || !columns.containsAll(nextDatasetColumns)) {
                    // unionAll will merge data based on column position without further enforcement of schemas.
                    throw new C3rRuntimeException("Found mismatched columns between "
                            + files[0].getAbsolutePath() + " and " + file.getAbsolutePath() + ".");
                }
                // We must use unionAll and not union because union filters on distinct rows.
                dataset = dataset.unionAll(nextDataset);
            }
        }
        return dataset;
    }