in c3r-cli-spark/src/main/java/com/amazonaws/c3r/spark/action/SparkMarshaller.java [157:184]
static void updateMaxValuesPerColumn(final Dataset<Row> rawInputData,
final List<ColumnInsight> columnInsights) {
final Map<ColumnHeader, List<ColumnInsight>> sourceMappedColumnInsights = columnInsights.stream()
.collect(Collectors.groupingBy(ColumnInsight::getSourceHeader));
Arrays.stream(rawInputData.columns()).forEach(col -> {
final Dataset<Row> columnData = rawInputData.select(functions.col(col)
.as("column"));
columnData.createOrReplaceTempView("singleColumnData");
final ColumnHeader columnHeader = new ColumnHeader(col);
final Row longestValueRow = rawInputData.sparkSession()
.sql("SELECT max(bit_length(column))\n" +
"FROM singleColumnData\n" +
"ORDER BY bit_length('column') DESC\n" +
"LIMIT 1")
.first();
final int maxBitLength = (longestValueRow.get(0) == null) ? 0 : longestValueRow.getInt(0);
final int maxByteLength = maxBitLength / Byte.SIZE;
for (ColumnInsight insight : sourceMappedColumnInsights.get(columnHeader)) {
// NOTE: This is essentially a custom in-place version of ValueConverter.getBytesForColumn
// since the Spark client can't use the Value infrastructure
if (insight.getType() == ColumnType.SEALED) {
insight.setMaxValueLength(maxByteLength + ClientDataInfo.BYTE_LENGTH);
} else {
insight.setMaxValueLength(maxByteLength);
}
}
});
}