in src/main/scala/org/apache/spark/sql/datasketches/kll/aggregate/KllDoublesSketchAggMerge.scala [88:122]
override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): KllDoublesSketchAggMerge =
copy(mutableAggBufferOffset = newMutableAggBufferOffset)
override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): KllDoublesSketchAggMerge =
copy(inputAggBufferOffset = newInputAggBufferOffset)
override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): KllDoublesSketchAggMerge =
copy(sketchExpr = newLeft, kExpr = newRight)
// overrides for TypedImperativeAggregate
override def prettyName: String = "kll_sketch_double_agg_merge"
override def dataType: DataType = KllDoublesSketchType
override def nullable: Boolean = false
override def inputTypes: Seq[AbstractDataType] = Seq(KllDoublesSketchType, IntegerType)
override def checkInputDataTypes(): TypeCheckResult = {
// k must be a constant
if (!kExpr.foldable) {
return TypeCheckResult.TypeCheckFailure(s"k must be foldable, but got: ${kExpr}")
}
// Check if k >= 8 and k <= MAX_K
kExpr.eval() match {
case k: Int if k >= 8 && k <= KllSketch.MAX_K => // valid state, do nothing
case k: Int if k > KllSketch.MAX_K => return TypeCheckResult.TypeCheckFailure(
s"k must be less than or equal to ${KllSketch.MAX_K}, but got: $k")
case k: Int => return TypeCheckResult.TypeCheckFailure(s"k must be at least 8 and no greater than ${KllSketch.MAX_K}, but got: $k")
case _ => return TypeCheckResult.TypeCheckFailure(s"Unsupported input type ${kExpr.dataType.catalogString}")
}
// additional validations of k handled in the DataSketches library
TypeCheckResult.TypeCheckSuccess
}