static void updateMaxValuesPerColumn()

in c3r-cli-spark/src/main/java/com/amazonaws/c3r/spark/action/SparkMarshaller.java [157:184]


    static void updateMaxValuesPerColumn(final Dataset<Row> rawInputData,
                                         final List<ColumnInsight> columnInsights) {
        final Map<ColumnHeader, List<ColumnInsight>> sourceMappedColumnInsights = columnInsights.stream()
                .collect(Collectors.groupingBy(ColumnInsight::getSourceHeader));
        Arrays.stream(rawInputData.columns()).forEach(col -> {
            final Dataset<Row> columnData = rawInputData.select(functions.col(col)
                    .as("column"));
            columnData.createOrReplaceTempView("singleColumnData");
            final ColumnHeader columnHeader = new ColumnHeader(col);
            final Row longestValueRow = rawInputData.sparkSession()
                    .sql("SELECT max(bit_length(column))\n" +
                            "FROM singleColumnData\n" +
                            "ORDER BY bit_length('column') DESC\n" +
                            "LIMIT 1")
                    .first();
            final int maxBitLength = (longestValueRow.get(0) == null) ? 0 : longestValueRow.getInt(0);
            final int maxByteLength = maxBitLength / Byte.SIZE;
            for (ColumnInsight insight : sourceMappedColumnInsights.get(columnHeader)) {
                // NOTE: This is essentially a custom in-place version of ValueConverter.getBytesForColumn
                // since the Spark client can't use the Value infrastructure
                if (insight.getType() == ColumnType.SEALED) {
                    insight.setMaxValueLength(maxByteLength + ClientDataInfo.BYTE_LENGTH);
                } else {
                    insight.setMaxValueLength(maxByteLength);
                }
            }
        });
    }