def apply()

in mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala [115:253]


  def apply(plan: ModularPlan): ModularPlan = {
    plan transform {
      case g@GroupBy(_, _, _, _,
        s@Select(_, _, _, aliasm, jedges, fact :: dims, _, _, _, _), _, _, _)
        if s.adjacencyList.keySet.size <= 1 &&
           jedges.forall(e => e.joinType == Inner) && // !s.flags.hasFlag(DISTINCT) &&
           fact.isInstanceOf[ModularRelation] &&
           (fact :: dims).forall(_.isInstanceOf[modular.LeafNode]) &&
           dims.nonEmpty =>
        val selAliasMap = AttributeMap(s.outputList.collect {
          case a: Alias if a.child.isInstanceOf[Attribute] => (a.toAttribute, a.child
            .asInstanceOf[Attribute])
        })
        val aggTransMap = findPushThroughAggregates(
          g.outputList,
          selAliasMap,
          fact.asInstanceOf[ModularRelation])

        val constraintsAttributeSet = dims.flatMap(s.extractEvaluableConditions)
          .map(_.references)
          .foldLeft(AttributeSet.empty)(_ ++ _)
        val groupingAttributeSet = g.predicateList.map(_.references)
          .foldLeft(AttributeSet.empty)(_ ++ _)
        if (aggTransMap.isEmpty ||
            // TODO: the following condition is too pessimistic, more work needed using methods
            // similar to those in trait
            //      QueryPlanConstraints
            !constraintsAttributeSet.subsetOf(groupingAttributeSet)) {
          g
        } else {
          val starJExprs = dims.flatMap(dim => s.extractJoinConditions(fact, dim))
          val gJAttributes = starJExprs.map(expr => expr.references)
            .foldLeft(AttributeSet.empty)(_ ++ _).filter(fact.outputSet.contains(_))
          val fExprs = s.extractEvaluableConditions(fact)
          val gFAttributes = fExprs.map(expr => expr.references)
            .foldLeft(AttributeSet.empty)(_ ++ _)
            .filter(fact.outputSet.contains(_))
          val gGAttributes = g.predicateList.map(expr => expr.references)
            .foldLeft(AttributeSet.empty)(_ ++ _).filter(fact.outputSet.contains(_))
          val gAttributes = (gJAttributes ++ gFAttributes ++ gGAttributes).toSeq

          val oAggregates = aggTransMap.values.flatMap(_._2).toSeq

          val tAliasMap = (aliasm.get(0) match {
            case Some(name) => Seq((0, name));
            case _ => Seq.empty
          }).toMap
          val sOutput = (oAggregates.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _) ++
                         AttributeSet(gAttributes)).toSeq
          val hFactSel = plans.modular
            .Select(
              sOutput,
              fact.output,
              Seq.empty,
              tAliasMap,
              Seq.empty,
              fact :: Nil,
              NoFlags,
              Seq.empty,
              Seq.empty)
          val hFact = plans.modular
            .GroupBy(
              gAttributes ++ oAggregates,
              sOutput,
              gAttributes,
              None,
              hFactSel,
              NoFlags,
              Seq.empty)
          val hFactName = s"gen_harmonized_${
            fact.asInstanceOf[ModularRelation]
              .databaseName
          }_${ fact.asInstanceOf[ModularRelation].tableName }"
          val hAliasMap = (aliasm - 0) + (0 -> hFactName)
          val hInputList = gAttributes ++ oAggregates.map(_.toAttribute) ++
                           dims.flatMap(_.asInstanceOf[modular.LeafNode].output)
          val attrOutputList = s.outputList.filter(expr => expr.isInstanceOf[Attribute] ||
                                                           (expr.isInstanceOf[Alias] &&
                                                            expr.asInstanceOf[Alias].child
                                                              .isInstanceOf[Attribute]))
          val aggOutputList = aggTransMap.values.flatMap(t => t._2)
            .map { ref =>
              ExpressionHelper.createReference(
                ref.name,
                ref.dataType,
                nullable = true,
                Metadata.empty,
                ref.exprId, Some(hFactName))
            }
          val hFactOutputSet = hFact.outputSet
          // Update the outputlist qualifier
          val hOutputList = (attrOutputList ++ aggOutputList).map {attr =>
            attr.transform {
              case ref: Attribute if hFactOutputSet.contains(ref) =>
                ExpressionHelper.createReference(
                  ref.name,
                  ref.dataType,
                  nullable = true,
                  Metadata.empty,
                  ref.exprId,
                  Some(hFactName))
            }
          }.asInstanceOf[Seq[NamedExpression]]

          // Update the predicate qualifier
          val hPredList = s.predicateList.map{ pred =>
            pred.transform {
              case ref: Attribute if hFactOutputSet.contains(ref) =>
                ExpressionHelper.createReference(
                  ref.name, ref.dataType, nullable = true, Metadata.empty,
                  ref.exprId, Some(hFactName))
            }
          }
          val hSel = s.copy(
              outputList = hOutputList,
              inputList = hInputList,
              aliasMap = hAliasMap,
              predicateList = hPredList,
              children = hFact :: dims)
          val gOutputList = g.outputList.zipWithIndex
            .map { case (expr, index) =>
              if (aggTransMap.keySet.contains(index)) {
                aggTransMap(index)
                  ._1
              } else {
                expr
              }
            }

          val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
          wip.transformExpressions {
            case ref: Attribute if hFactOutputSet.contains(ref) =>
              ExpressionHelper.createReference(
                ref.name, ref.dataType, nullable = true, Metadata.empty,
                ref.exprId, Some(hFactName))
          }
        }
    }
  }