void marshalOutput()

in c3r-sdk-core/src/main/java/com/amazonaws/c3r/action/RowMarshaller.java [377:426]


    void marshalOutput() {
        log.debug("Randomizing data order.");
        long startTime = System.currentTimeMillis();

        // Create a covering index for all rows to improve our ORDER BY performance
        // to sort the table by nonce and induce a random order.
        try {
            final var stmt = this.sqlTable.getConnection().createStatement();
            stmt.execute(TableGenerator.getCoveringIndexStatement(stmt, schema, nonceHeader));
        } catch (SQLException e) {
            throw new C3rRuntimeException("An SQL exception occurred during marshalling.", e);
        }
        long endTime = System.currentTimeMillis();
        log.debug("Done randomizing data order in {} seconds.", TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));

        log.debug("Emitting encrypted data.");
        startTime = System.currentTimeMillis();

        final RowReader<T> sqlRowReader = new SqlRowReader<>(columnInsights, nonceHeader, valueFactory, sqlTable);
        while (sqlRowReader.hasNext()) {
            final Row<T> rowOut = sqlRowReader.next();
            final Row<T> marshalledRow = valueFactory.newRow();
            final Nonce nonce = new Nonce(rowOut.getValue(nonceHeader).getBytes());
            // Nonces don't get written to final output
            rowOut.removeColumn(nonceHeader);
            rowOut.forEach((column, value) -> {
                final var columnInsight = targetMappedColumnInsights.get(column);
                final Transformer transformer = transformers.get(columnInsight.getType());
                byte[] marshalledBytes = value.getBytes();

                // Replace bytes for columns marked with PadType.MAX now that we know the longest value length.
                // All other values are already marshalled correctly.
                if (columnInsight.getPad() != null && columnInsight.getPad().getType() == PadType.MAX) {
                    final EncryptionContext encryptionContext = new EncryptionContext(columnInsight, nonce, value.getClientDataType());
                    final byte[] unmarshalledMaxColumnBytes = transformer.unmarshal(marshalledBytes);
                    marshalledBytes = transformer.marshal(unmarshalledMaxColumnBytes, encryptionContext);
                }

                marshalledRow.putBytes(column, marshalledBytes);
            });
            outputWriter.writeRow(marshalledRow);
            if (sqlRowReader.getReadRowCount() % LOG_ROW_UPDATE_FREQUENCY == 0) {
                log.info("{} rows emitted.", sqlRowReader.getReadRowCount());
            }
        }
        outputWriter.flush();
        endTime = System.currentTimeMillis();
        log.debug("Done emitting {} encrypted rows in {} seconds.", sqlRowReader.getReadRowCount(),
                TimeUnit.MILLISECONDS.toSeconds(endTime - startTime));
    }