in src/main/scala/org/apache/spark/sql/datasketches/theta/aggregate/ThetaSketchAggUnion.scala [110:126]
override def inputTypes: Seq[AbstractDataType] = Seq(ThetaSketchType, IntegerType, LongType)
override def createAggregationBuffer(): ThetaSketchWrapper = new ThetaSketchWrapper(union
= Some(SetOperation.builder().setLogNominalEntries(lgK).setSeed(seed).buildUnion()))
override def update(wrapper: ThetaSketchWrapper, input: InternalRow): ThetaSketchWrapper = {
val bytes = inputExpr.eval(input)
if (bytes != null) {
inputExpr.dataType match {
case ThetaSketchType =>
wrapper.union.get.union(Sketch.wrap(Memory.wrap(bytes.asInstanceOf[Array[Byte]])))
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${inputExpr.dataType.catalogString}")
}
}
wrapper
}