in c3r-cli-spark/src/main/java/com/amazonaws/c3r/spark/cli/DecryptMode.java [219:259]
public Integer call() {
try {
validate();
final SparkDecryptConfig cfg = getConfig();
if (!optionalArgs.dryRun) {
log.info("Decrypting data from {}.", cfg.getSourceFile());
switch (cfg.getFileFormat()) {
case CSV:
final Dataset<Row> csvDataset = SparkCsvReader.readInput(sparkSession,
cfg.getSourceFile(),
cfg.getCsvInputNullValue(),
/* externalHeaders */ null,
/* skipHeaderNormalization */ true);
final Dataset<Row> unmarshalledCsvDataset = SparkUnmarshaller.decrypt(csvDataset, cfg);
SparkCsvWriter.writeOutput(unmarshalledCsvDataset, cfg.getTargetFile(), cfg.getCsvOutputNullValue());
break;
case PARQUET:
final Dataset<Row> parquetDataset = SparkParquetReader.readInput(
sparkSession,
cfg.getSourceFile(),
/* skipHeaderNormalization */ true,
ParquetConfig.DEFAULT);
final Dataset<Row> unmarshalledParquetDataset = SparkUnmarshaller.decrypt(parquetDataset, cfg);
SparkParquetWriter.writeOutput(unmarshalledParquetDataset, cfg.getTargetFile());
break;
default:
throw new C3rIllegalArgumentException("Unrecognized file format: " + cfg.getFileFormat());
}
SparkSessionUtil.closeSparkSession(sparkSession);
log.info("Decrypted data saved in {}.", cfg.getTargetFile());
} else {
log.info("Dry run: No data will be decrypted from {}.", cfg.getSourceFile());
}
} catch (Exception e) {
Main.handleException(e, optionalArgs.enableStackTraces);
return Main.FAILURE;
}
return Main.SUCCESS;
}