in src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/job/step/build/BuildStage.scala [407:496]
protected def buildInferior(): Unit = {
val schema = flatTable.schema
val arraySize = schema.size
def getArray(index: IndexEntity): Array[Double] = {
val arr = new Array[Double](arraySize)
columnsFromFlatTable(index).foreach { column =>
arr(schema.fieldIndex(column)) = 1.0d
}
arr
}
def getK(nodes: Seq[TreeNode]): Int = {
val groupFactor = Math.max(1, config.getInferiorFlatTableGroupFactor)
val k = Math.max(1, //
Math.max(nodes.size / groupFactor //
, nodes.map(_.getIndex) //
.flatMap(_.getEffectiveDimCols.keySet().asScala) //
.distinct.size / groupFactor))
if (nodes.size < k) {
1
} else {
k
}
}
class ClusterNode(private val node: TreeNode) extends Clusterable {
override def getPoint: Array[Double] = getArray(node.getIndex)
def getNode: TreeNode = node
}
case class GroupedNodeColumn(nodes: Seq[TreeNode], columns: Seq[String])
def cluster(nodes: Seq[TreeNode]): Seq[GroupedNodeColumn] = {
if (nodes.isEmpty) {
return Seq.empty[GroupedNodeColumn]
}
val k = getK(nodes)
val kcluster = new KMeansPlusPlusClusterer[ClusterNode](k, -1, new EarthMoversDistance())
kcluster.cluster( //
scala.collection.JavaConverters.seqAsJavaList(nodes.map(n => new ClusterNode(n)))).asScala //
.map { cluster =>
val grouped = cluster.getPoints.asScala.map(_.getNode)
val columns = grouped.map(_.getIndex).flatMap(columnsFromFlatTable).distinct.sorted
GroupedNodeColumn(grouped, columns)
}
}
val indexInferiorMap = mutable.HashMap[Long, InferiorGroup]()
val dimensionFactor = Math.max(1, config.getInferiorFlatTableDimensionFactor)
val nonSpanned = spanningTree.getFromFlatTableNodes.asScala.filter(_.nonSpanned())
val clustered = nonSpanned.groupBy(node => node.getDimensionSize / dimensionFactor) //
.values.filter { grouped =>
val groupDesc = grouped.map(_.getIndex.getId).sorted.mkString("[", ",", "]")
if (grouped.size > 1) {
logInfo(s"Segment $segmentId coarse index group $groupDesc")
true
} else {
logInfo(s"Segment $segmentId skip coarse index group $groupDesc")
false
}
}.flatMap(cluster)
val inferiors = clustered.zipWithIndex.map { case (grouped, i) =>
logInfo(s"Segment $segmentId inferior indices columns " +
s"${grouped.nodes.size} ${grouped.nodes.map(_.getIndex.getId).sorted.mkString("[", ",", "]")} " +
s"${grouped.columns.size} ${grouped.columns.mkString("[", ",", "]")}")
val reapCount = new CountDownLatch(grouped.nodes.map(node => node.getNonSpannedCount).sum)
val tableDS = flatTable.select(grouped.columns.map(col): _*)
val inferior = InferiorGroup(tableDS, reapCount)
grouped.nodes.foreach { node =>
indexInferiorMap.put(node.getIndex.getId, inferior)
// Locality of reference principle.
node.setLocalPriority(i)
}
inferior
}
if (inferiors.isEmpty) {
return
}
params.setCachedIndexInferior(Some(indexInferiorMap.toMap))
}