in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RelTreeWriterImpl.scala [68:255]
override def explain_(rel: RelNode, values: util.List[Pair[String, AnyRef]]): Unit = {
val inputs = rel.getInputs
val mq = rel.getCluster.getMetadataQuery
if (!mq.isVisibleInExplain(rel, explainLevel)) {
// render children in place of this, at same level
inputs.toSeq.foreach(_.explain(this))
return
}
val s = new StringBuilder
if (withTreeStyle) {
if (depth > 0) {
lastChildren.init.foreach(isLast => s.append(if (isLast) " " else ": "))
s.append(if (lastChildren.last) "+- " else ":- ")
}
}
if (withIdPrefix) {
s.append(rel.getId).append(":")
}
rel.getRelTypeName match {
case name if name.startsWith("BatchExec") => s.append(name.substring(9))
case name if name.startsWith("BatchPhysical") => s.append(name.substring(13))
case name if name.startsWith("StreamExec") => s.append(name.substring(10))
case name if name.startsWith("StreamPhysical") => s.append(name.substring(14))
case name => s.append(name)
}
val printValues = new util.ArrayList[Pair[String, AnyRef]]()
if (explainLevel != SqlExplainLevel.NO_ATTRIBUTES) {
if (withAdvice) {
if (depth == 0) {
applyAdvice(rel)
}
val adviceToRel = NODE_LEVEL_ADVICE.get(rel.getId)
if (adviceToRel != null) {
val adviceIds = new util.LinkedHashSet[Integer]()
adviceToRel.foreach {
advice =>
ADVICE_IDS.computeIfAbsent(advice, _ => NEXT_ADVICE_ID.getAndIncrement())
adviceIds.add(ADVICE_IDS.get(advice))
}
printValues.add(Pair.of("advice", adviceIds.mkString(", ")))
}
}
printValues.addAll(values)
}
if (withChangelogTraits) rel match {
case streamRel: StreamPhysicalRel =>
val changelogMode = ChangelogPlanUtils.getChangelogMode(streamRel)
printValues.add(
Pair.of("changelogMode", ChangelogPlanUtils.stringifyChangelogMode(changelogMode)))
case _ => // ignore
}
if (withUpsertKey) rel match {
case streamRel: StreamPhysicalRel =>
val fmq = FlinkRelMetadataQuery.reuseOrCreate(rel.getCluster.getMetadataQuery)
val upsertKeys = fmq.getUpsertKeys(streamRel)
if (null != upsertKeys && !upsertKeys.isEmpty) {
val fieldNames = streamRel.getRowType.getFieldNames
printValues.add(
Pair.of(
"upsertKeys",
upsertKeys
.map(bitset => bitset.toArray.map(fieldNames).mkString("[", ", ", "]"))
.mkString(", ")))
}
case _ => // ignore
}
if (withQueryHint) {
rel match {
case _: Join | _: Correlate =>
val joinHints = FlinkHints.getAllJoinHints(rel.asInstanceOf[Hintable].getHints)
if (joinHints.nonEmpty) {
printValues.add(Pair.of("joinHints", RelExplainUtil.hintsToString(joinHints)))
}
val stateTtlHints = FlinkHints.getAllStateTtlHints(rel.asInstanceOf[Hintable].getHints)
if (stateTtlHints.nonEmpty) {
printValues.add(Pair.of("stateTtlHints", RelExplainUtil.hintsToString(stateTtlHints)))
}
case _: Aggregate | _: StreamPhysicalGroupAggregateBase =>
val aggHints =
rel match {
case aggregate: Aggregate => aggregate.getHints
case _ => rel.asInstanceOf[StreamPhysicalGroupAggregateBase].hints
}
val stateTtlHints = FlinkHints.getAllStateTtlHints(aggHints)
if (stateTtlHints.nonEmpty) {
printValues.add(Pair.of("stateTtlHints", RelExplainUtil.hintsToString(stateTtlHints)))
}
case _ => // ignore
}
}
if (withQueryBlockAlias) {
rel match {
case node: Hintable =>
node match {
case _: TableScan =>
// We don't need to pint hints about TableScan because TableScan will always
// print hints if exist. See more in such as LogicalTableScan#explainTerms
case _ =>
val queryBlockAliasHints = FlinkHints.getQueryBlockAliasHints(node.getHints)
if (queryBlockAliasHints.nonEmpty) {
printValues.add(
Pair.of("hints", RelExplainUtil.hintsToString(queryBlockAliasHints)))
}
}
case _ => // ignore
}
}
if (!printValues.isEmpty) {
var j = 0
printValues.toSeq.foreach {
case value if value.right.isInstanceOf[RelNode] => // do nothing
case value =>
if (j == 0) s.append("(") else s.append(", ")
j = j + 1
s.append(value.left).append("=[").append(value.right).append("]")
}
if (j > 0) s.append(")")
}
if (withRowType) {
s.append(", rowType=[").append(rel.getRowType.toString).append("]")
}
if (explainLevel == SqlExplainLevel.ALL_ATTRIBUTES) {
s.append(": rowcount = ")
.append(mq.getRowCount(rel))
.append(", cumulative cost = ")
.append(mq.getCumulativeCost(rel))
}
pw.println(s)
if (inputs.length > 1) inputs.toSeq.init.foreach {
rel =>
if (withTreeStyle) {
depth = depth + 1
lastChildren = lastChildren :+ false
}
rel.explain(this)
if (withTreeStyle) {
depth = depth - 1
lastChildren = lastChildren.init
}
}
if (!inputs.isEmpty) {
if (withTreeStyle) {
depth = depth + 1
lastChildren = lastChildren :+ true
}
inputs.toSeq.last.explain(this)
if (withTreeStyle) {
depth = depth - 1
lastChildren = lastChildren.init
}
}
/*
* check depth (zero means recursion finishes) to ensure sub-plan reuse condition is covered
* check statementCnt to ensure statement set condition is covered
*/
if (withAdvice && depth == 0 && statementNum == statementCnt) {
pw.println()
ADVICE_IDS.foreach(
advice =>
pw.println(s"advice[${advice._2}]: [${advice._1.getKind}] ${advice._1.getContent}"))
QUERY_LEVEL_ADVICE.forEach(
advice =>
pw.println(
s"advice[${NEXT_ADVICE_ID.getAndIncrement()}]: " +
s"[${advice.getKind}] ${advice.getContent}"))
if (ADVICE_IDS.isEmpty && QUERY_LEVEL_ADVICE.isEmpty) {
pw.println("No available advice...")
}
}
}