public static Map generateCost()

in src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/model/planner/FlatTableToCostUtils.java [76:170]


    public static Map<BigInteger, HLLCounter> generateCost(JavaRDD<Row> input, KylinConfig kylinConfig,
            RuleBasedIndex ruleBasedIndex, SegmentFlatTableDesc flatTableDesc) throws IOException {
        // step1: convert each cell to string data type, and get RDD[String[]]
        JavaRDD<String[]> flatTableRDD = input.map(new Function<Row, String[]>() {
            @Override
            public String[] call(Row row) throws Exception {
                String[] result = new String[row.length()];
                for (int i = 0; i < row.length(); i++) {
                    final Object o = row.get(i);
                    if (o != null) {
                        result[i] = o.toString();
                    } else {
                        result[i] = null;
                    }
                }
                return result;
            }
        });
        // step2: calculate the cost for each partition, and get the new RDD.
        // The key is cuboid, and the value is the data encoded from the hll for each partition.
        int rowKeyCount = ruleBasedIndex.countOfIncludeDimension();
        // layouts from the rule index(agg group)
        Set<LayoutEntity> inputLayouts = ruleBasedIndex.genCuboidLayouts();
        // add the mock layout which include all of the dimensions in the rule index
        inputLayouts.add(createMockRuleBaseLayout(ruleBasedIndex));
        BigInteger[] inputCuboids = getCuboIdsFromLayouts(Lists.newArrayList(inputLayouts), rowKeyCount,
                ruleBasedIndex.getColumnIdToRowKeyId());
        // rowkey id ->  column index in the flat table of the flat dataset.
        int[] rowkeyColumnIndexes = getRowkeyColumnIndexes(ruleBasedIndex, flatTableDesc);

        int hllPrecision = kylinConfig.getStatsHLLPrecision();
        log.info("The row key count is {}, and the index/column map is {}", rowKeyCount,
                Lists.newArrayList(rowkeyColumnIndexes));
        JavaPairRDD<BigInteger, byte[]> costRddByPartition = flatTableRDD.mapPartitionsToPair(
                new FlatOutputFunction(hllPrecision, rowKeyCount, inputCuboids, rowkeyColumnIndexes));

        // step3: reduce by cuboid, and merge hll data
        // The key is the cuboid, the value is data encoded from the hll
        int partitionNum = getCuboidHLLCounterReducerNum(inputCuboids.length, kylinConfig);
        log.info("Get the partition count for the HLL reducer: {}", partitionNum);
        JavaPairRDD<BigInteger, byte[]> costRDD = costRddByPartition.reduceByKey(new Partitioner() {
            private int num = partitionNum;
            private BigInteger bigIntegerMod = BigInteger.valueOf(num);

            @Override
            public int numPartitions() {
                return num;
            }

            @Override
            public int getPartition(Object key) {
                // key is the biginteger
                BigInteger value = (BigInteger) key;
                return value.mod(bigIntegerMod).intValue();
            }
        }, new Function2<byte[], byte[], byte[]>() {
            private int precision = hllPrecision;

            @Override
            public byte[] call(byte[] array1, byte[] array2) throws Exception {
                // hll1
                HLLCounter hll1 = new HLLCounter(precision);
                ByteBuffer buffer1 = ByteBuffer.wrap(array1, 0, array1.length);
                hll1.readRegisters(buffer1);
                // hll2
                HLLCounter hll2 = new HLLCounter(precision);
                ByteBuffer buffer2 = ByteBuffer.wrap(array2, 0, array2.length);
                hll2.readRegisters(buffer2);

                // merge two hll
                hll1.merge(hll2);
                ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
                hll1.writeRegisters(hllBuf);
                byte[] value = new byte[hllBuf.position()];
                System.arraycopy(hllBuf.array(), 0, value, 0, hllBuf.position());
                return value;
            }
        });
        // step4: collect the final result, and convert value(text) to hll
        // The key is the cuboid, and the value is the estimated statistics
        Map<BigInteger, HLLCounter> resultCost = Maps.newHashMap();
        for (Tuple2<BigInteger, byte[]> pair : costRDD.collect()) {
            HLLCounter hll = new HLLCounter(kylinConfig.getStatsHLLPrecision());
            // value
            ByteArray byteArray = new ByteArray(pair._2);
            hll.readRegisters(byteArray.asBuffer());
            // put key and value
            resultCost.put(pair._1, hll);
        }
        // log data
        if (log.isDebugEnabled()) {
            logMapperAndCuboidStatistics(resultCost, 100);
        }
        return resultCost;
    }