in c3r-cli-spark/src/main/java/com/amazonaws/c3r/spark/cli/EncryptMode.java [341:386]
public Integer call() {
try {
validate();
final SparkEncryptConfig cfg = getConfig();
final ParquetConfig pCfg = getParquetConfig();
printColumnTransformInfo(cfg.getTableSchema());
if (!optionalArgs.dryRun) {
log.info("Encrypting data from {}.", cfg.getSourceFile());
switch (cfg.getFileFormat()) {
case CSV:
if (pCfg.isSet()) {
throw new C3rIllegalArgumentException("Parquet options specified for CSV file.");
}
final Dataset<Row> csvDataset = SparkCsvReader.readInput(sparkSession,
cfg.getSourceFile(),
cfg.getCsvInputNullValue(),
cfg.getTableSchema().getPositionalColumnHeaders());
final Dataset<Row> marshalledCsvDataset = SparkMarshaller.encrypt(csvDataset, cfg);
SparkCsvWriter.writeOutput(marshalledCsvDataset, cfg.getTargetFile(), cfg.getCsvOutputNullValue());
break;
case PARQUET:
final Dataset<Row> parquetDataset = SparkParquetReader.readInput(
sparkSession,
cfg.getSourceFile(),
/* skipHeaderNormalization */ false,
pCfg);
final Dataset<Row> marshalledParquetDataset = SparkMarshaller.encrypt(parquetDataset, cfg);
SparkParquetWriter.writeOutput(marshalledParquetDataset, cfg.getTargetFile());
break;
default:
throw new C3rIllegalArgumentException("Unrecognized file format: " + cfg.getFileFormat());
}
SparkSessionUtil.closeSparkSession(sparkSession);
log.info("Encrypted data was saved to {}.", cfg.getTargetFile());
} else {
log.info("Dry run: No data will be encrypted from {}.", cfg.getSourceFile());
}
} catch (Exception e) {
Main.handleException(e, optionalArgs.enableStackTraces);
return Main.FAILURE;
}
return Main.SUCCESS;
}