in modules/spark-ext/spark/src/main/scala/org/apache/ignite/spark/impl/optimization/package.scala [91:174]
def exprsAllowed(exprs: Seq[Expression]): Boolean =
exprs.forall(exprsAllowed)
/**
* @param expr Expression to check.
* @return True if `expr` allowed(i.e. can be pushed down to Ignite) false otherwise.
*
*/
def exprsAllowed(expr: Expression): Boolean =
SUPPORTED_EXPRESSIONS.exists(_(expr, exprsAllowed))
/**
* Converts `input` into `AttributeReference`.
*
* @param input Expression to convert.
* @param existingOutput Existing output.
* @param exprId Optional expression ID to use.
* @param alias Optional alias for a result.
* @return Converted expression.
*/
def toAttributeReference(input: Expression, existingOutput: Seq[NamedExpression], exprId: Option[ExprId] = None,
alias: Option[String] = None): AttributeReference = {
input match {
case attr: AttributeReference ⇒
val toCopy = existingOutput.find(_.exprId == attr.exprId).getOrElse(attr)
AttributeReference(
name = toCopy.name,
dataType = toCopy.dataType,
metadata = alias
.map(new MetadataBuilder().withMetadata(toCopy.metadata).putString(ALIAS, _).build())
.getOrElse(toCopy.metadata)
)(exprId = exprId.getOrElse(toCopy.exprId), qualifier = toCopy.qualifier)
case a: Alias ⇒
toAttributeReference(a.child, existingOutput, Some(a.exprId), Some(alias.getOrElse(a.name)))
case agg: AggregateExpression ⇒
agg.aggregateFunction match {
case c: Count ⇒
if (agg.isDistinct)
AttributeReference(
name = s"COUNT(DISTINCT ${c.children.map(exprToString(_)).mkString(" ")})",
dataType = LongType,
metadata = alias
.map(new MetadataBuilder().putString(ALIAS, _).build())
.getOrElse(Metadata.empty)
)(exprId = exprId.getOrElse(agg.resultId))
else
AttributeReference(
name = s"COUNT(${c.children.map(exprToString(_)).mkString(" ")})",
dataType = LongType,
metadata = alias
.map(new MetadataBuilder().putString(ALIAS, _).build())
.getOrElse(Metadata.empty)
)(exprId = exprId.getOrElse(agg.resultId))
case _ ⇒
toAttributeReference(agg.aggregateFunction, existingOutput, Some(exprId.getOrElse(agg.resultId)), alias)
}
case ne: NamedExpression ⇒
AttributeReference(
name = exprToString(input),
dataType = input.dataType,
metadata = alias
.map(new MetadataBuilder().withMetadata(ne.metadata).putString(ALIAS, _).build())
.getOrElse(Metadata.empty)
)(exprId = exprId.getOrElse(ne.exprId))
case _ if exprsAllowed(input) ⇒
AttributeReference(
name = exprToString(input),
dataType = input.dataType,
metadata = alias
.map(new MetadataBuilder().putString(ALIAS, _).build())
.getOrElse(Metadata.empty)
)(exprId = exprId.getOrElse(NamedExpression.newExprId))
case _ ⇒
throw new IgniteException(s"Unsupported column expression $input")
}
}