in src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/model/planner/FlatTableToCostUtils.java [76:170]
public static Map<BigInteger, HLLCounter> generateCost(JavaRDD<Row> input, KylinConfig kylinConfig,
RuleBasedIndex ruleBasedIndex, SegmentFlatTableDesc flatTableDesc) throws IOException {
// step1: convert each cell to string data type, and get RDD[String[]]
JavaRDD<String[]> flatTableRDD = input.map(new Function<Row, String[]>() {
@Override
public String[] call(Row row) throws Exception {
String[] result = new String[row.length()];
for (int i = 0; i < row.length(); i++) {
final Object o = row.get(i);
if (o != null) {
result[i] = o.toString();
} else {
result[i] = null;
}
}
return result;
}
});
// step2: calculate the cost for each partition, and get the new RDD.
// The key is cuboid, and the value is the data encoded from the hll for each partition.
int rowKeyCount = ruleBasedIndex.countOfIncludeDimension();
// layouts from the rule index(agg group)
Set<LayoutEntity> inputLayouts = ruleBasedIndex.genCuboidLayouts();
// add the mock layout which include all of the dimensions in the rule index
inputLayouts.add(createMockRuleBaseLayout(ruleBasedIndex));
BigInteger[] inputCuboids = getCuboIdsFromLayouts(Lists.newArrayList(inputLayouts), rowKeyCount,
ruleBasedIndex.getColumnIdToRowKeyId());
// rowkey id -> column index in the flat table of the flat dataset.
int[] rowkeyColumnIndexes = getRowkeyColumnIndexes(ruleBasedIndex, flatTableDesc);
int hllPrecision = kylinConfig.getStatsHLLPrecision();
log.info("The row key count is {}, and the index/column map is {}", rowKeyCount,
Lists.newArrayList(rowkeyColumnIndexes));
JavaPairRDD<BigInteger, byte[]> costRddByPartition = flatTableRDD.mapPartitionsToPair(
new FlatOutputFunction(hllPrecision, rowKeyCount, inputCuboids, rowkeyColumnIndexes));
// step3: reduce by cuboid, and merge hll data
// The key is the cuboid, the value is data encoded from the hll
int partitionNum = getCuboidHLLCounterReducerNum(inputCuboids.length, kylinConfig);
log.info("Get the partition count for the HLL reducer: {}", partitionNum);
JavaPairRDD<BigInteger, byte[]> costRDD = costRddByPartition.reduceByKey(new Partitioner() {
private int num = partitionNum;
private BigInteger bigIntegerMod = BigInteger.valueOf(num);
@Override
public int numPartitions() {
return num;
}
@Override
public int getPartition(Object key) {
// key is the biginteger
BigInteger value = (BigInteger) key;
return value.mod(bigIntegerMod).intValue();
}
}, new Function2<byte[], byte[], byte[]>() {
private int precision = hllPrecision;
@Override
public byte[] call(byte[] array1, byte[] array2) throws Exception {
// hll1
HLLCounter hll1 = new HLLCounter(precision);
ByteBuffer buffer1 = ByteBuffer.wrap(array1, 0, array1.length);
hll1.readRegisters(buffer1);
// hll2
HLLCounter hll2 = new HLLCounter(precision);
ByteBuffer buffer2 = ByteBuffer.wrap(array2, 0, array2.length);
hll2.readRegisters(buffer2);
// merge two hll
hll1.merge(hll2);
ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
hll1.writeRegisters(hllBuf);
byte[] value = new byte[hllBuf.position()];
System.arraycopy(hllBuf.array(), 0, value, 0, hllBuf.position());
return value;
}
});
// step4: collect the final result, and convert value(text) to hll
// The key is the cuboid, and the value is the estimated statistics
Map<BigInteger, HLLCounter> resultCost = Maps.newHashMap();
for (Tuple2<BigInteger, byte[]> pair : costRDD.collect()) {
HLLCounter hll = new HLLCounter(kylinConfig.getStatsHLLPrecision());
// value
ByteArray byteArray = new ByteArray(pair._2);
hll.readRegisters(byteArray.asBuffer());
// put key and value
resultCost.put(pair._1, hll);
}
// log data
if (log.isDebugEnabled()) {
logMapperAndCuboidStatistics(resultCost, 100);
}
return resultCost;
}