in c3r-sdk-core/src/main/java/com/amazonaws/c3r/action/RowMarshaller.java [377:426]
void marshalOutput() {
log.debug("Randomizing data order.");
long startTime = System.currentTimeMillis();
// Create a covering index for all rows to improve our ORDER BY performance
// to sort the table by nonce and induce a random order.
try {
final var stmt = this.sqlTable.getConnection().createStatement();
stmt.execute(TableGenerator.getCoveringIndexStatement(stmt, schema, nonceHeader));
} catch (SQLException e) {
throw new C3rRuntimeException("An SQL exception occurred during marshalling.", e);
}
long endTime = System.currentTimeMillis();
log.debug("Done randomizing data order in {} seconds.", TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
log.debug("Emitting encrypted data.");
startTime = System.currentTimeMillis();
final RowReader<T> sqlRowReader = new SqlRowReader<>(columnInsights, nonceHeader, valueFactory, sqlTable);
while (sqlRowReader.hasNext()) {
final Row<T> rowOut = sqlRowReader.next();
final Row<T> marshalledRow = valueFactory.newRow();
final Nonce nonce = new Nonce(rowOut.getValue(nonceHeader).getBytes());
// Nonces don't get written to final output
rowOut.removeColumn(nonceHeader);
rowOut.forEach((column, value) -> {
final var columnInsight = targetMappedColumnInsights.get(column);
final Transformer transformer = transformers.get(columnInsight.getType());
byte[] marshalledBytes = value.getBytes();
// Replace bytes for columns marked with PadType.MAX now that we know the longest value length.
// All other values are already marshalled correctly.
if (columnInsight.getPad() != null && columnInsight.getPad().getType() == PadType.MAX) {
final EncryptionContext encryptionContext = new EncryptionContext(columnInsight, nonce, value.getClientDataType());
final byte[] unmarshalledMaxColumnBytes = transformer.unmarshal(marshalledBytes);
marshalledBytes = transformer.marshal(unmarshalledMaxColumnBytes, encryptionContext);
}
marshalledRow.putBytes(column, marshalledBytes);
});
outputWriter.writeRow(marshalledRow);
if (sqlRowReader.getReadRowCount() % LOG_ROW_UPDATE_FREQUENCY == 0) {
log.info("{} rows emitted.", sqlRowReader.getReadRowCount());
}
}
outputWriter.flush();
endTime = System.currentTimeMillis();
log.debug("Done emitting {} encrypted rows in {} seconds.", sqlRowReader.getReadRowCount(),
TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
}