protected def buildInferior()

in src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/build/BuildStage.scala [407:496]


  protected def buildInferior(): Unit = {

    val schema = flatTable.schema
    val arraySize = schema.size

    def getArray(index: IndexEntity): Array[Double] = {
      val arr = new Array[Double](arraySize)
      columnsFromFlatTable(index).foreach { column =>
        arr(schema.fieldIndex(column)) = 1.0d
      }
      arr
    }

    def getK(nodes: Seq[TreeNode]): Int = {
      val groupFactor = Math.max(1, config.getInferiorFlatTableGroupFactor)
      val k = Math.max(1, //
        Math.max(nodes.size / groupFactor //
          , nodes.map(_.getIndex) //
            .flatMap(_.getEffectiveDimCols.keySet().asScala) //
            .distinct.size / groupFactor))
      if (nodes.size < k) {
        1
      } else {
        k
      }
    }

    class ClusterNode(private val node: TreeNode) extends Clusterable {
      override def getPoint: Array[Double] = getArray(node.getIndex)

      def getNode: TreeNode = node
    }

    case class GroupedNodeColumn(nodes: Seq[TreeNode], columns: Seq[String])

    def cluster(nodes: Seq[TreeNode]): Seq[GroupedNodeColumn] = {
      if (nodes.isEmpty) {
        return Seq.empty[GroupedNodeColumn]
      }

      val k = getK(nodes)

      val kcluster = new KMeansPlusPlusClusterer[ClusterNode](k, -1, new EarthMoversDistance())
      kcluster.cluster( //
          scala.collection.JavaConverters.seqAsJavaList(nodes.map(n => new ClusterNode(n)))).asScala //
        .map { cluster =>
          val grouped = cluster.getPoints.asScala.map(_.getNode)
          val columns = grouped.map(_.getIndex).flatMap(columnsFromFlatTable).distinct.sorted
          GroupedNodeColumn(grouped, columns)
        }
    }

    val indexInferiorMap = mutable.HashMap[Long, InferiorGroup]()
    val dimensionFactor = Math.max(1, config.getInferiorFlatTableDimensionFactor)
    val nonSpanned = spanningTree.getFromFlatTableNodes.asScala.filter(_.nonSpanned())
    val clustered = nonSpanned.groupBy(node => node.getDimensionSize / dimensionFactor) //
      .values.filter { grouped =>
        val groupDesc = grouped.map(_.getIndex.getId).sorted.mkString("[", ",", "]")
        if (grouped.size > 1) {
          logInfo(s"Segment $segmentId coarse index group $groupDesc")
          true
        } else {
          logInfo(s"Segment $segmentId skip coarse index group $groupDesc")
          false
        }
      }.flatMap(cluster)

    val inferiors = clustered.zipWithIndex.map { case (grouped, i) =>
      logInfo(s"Segment $segmentId inferior indices columns " +
        s"${grouped.nodes.size} ${grouped.nodes.map(_.getIndex.getId).sorted.mkString("[", ",", "]")} " +
        s"${grouped.columns.size} ${grouped.columns.mkString("[", ",", "]")}")

      val reapCount = new CountDownLatch(grouped.nodes.map(node => node.getNonSpannedCount).sum)
      val tableDS = flatTable.select(grouped.columns.map(col): _*)

      val inferior = InferiorGroup(tableDS, reapCount)
      grouped.nodes.foreach { node =>
        indexInferiorMap.put(node.getIndex.getId, inferior)
        // Locality of reference principle.
        node.setLocalPriority(i)
      }
      inferior
    }

    if (inferiors.isEmpty) {
      return
    }

    params.setCachedIndexInferior(Some(indexInferiorMap.toMap))
  }