in src/main/scala/org/apache/spark/sql/datasketches/theta/aggregate/ThetaSketchAggBuild.scala [126:147]
override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(NumericType, StringType), IntegerType, LongType, FloatType)
override def createAggregationBuffer(): ThetaSketchWrapper = new ThetaSketchWrapper()
override def update(wrapper: ThetaSketchWrapper, input: InternalRow): ThetaSketchWrapper = {
val value = inputExpr.eval(input)
if (value != null) {
if (wrapper.updateSketch.isEmpty) {
wrapper.updateSketch = Some(UpdateSketch.builder().setLogNominalEntries(lgK).setSeed(seed).setP(p).build())
}
inputExpr.dataType match {
case DoubleType => wrapper.updateSketch.get.update(value.asInstanceOf[Double])
case FloatType => wrapper.updateSketch.get.update(value.asInstanceOf[Float])
case IntegerType => wrapper.updateSketch.get.update(value.asInstanceOf[Int])
case LongType => wrapper.updateSketch.get.update(value.asInstanceOf[Long])
case StringType => wrapper.updateSketch.get.update(value.asInstanceOf[UTF8String].toString)
case _ => throw new IllegalArgumentException(
s"Unsupported input type ${inputExpr.dataType.catalogString}")
}
}
wrapper
}