override def inputTypes: Seq[AbstractDataType] = Seq()

in src/main/scala/org/apache/spark/sql/datasketches/theta/aggregate/ThetaSketchAggBuild.scala [126:147]


  override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(NumericType, StringType), IntegerType, LongType, FloatType)

  override def createAggregationBuffer(): ThetaSketchWrapper = new ThetaSketchWrapper()

  override def update(wrapper: ThetaSketchWrapper, input: InternalRow): ThetaSketchWrapper = {
    val value = inputExpr.eval(input)
    if (value != null) {
      if (wrapper.updateSketch.isEmpty) {
        wrapper.updateSketch = Some(UpdateSketch.builder().setLogNominalEntries(lgK).setSeed(seed).setP(p).build())
      }
      inputExpr.dataType match {
        case DoubleType => wrapper.updateSketch.get.update(value.asInstanceOf[Double])
        case FloatType => wrapper.updateSketch.get.update(value.asInstanceOf[Float])
        case IntegerType => wrapper.updateSketch.get.update(value.asInstanceOf[Int])
        case LongType => wrapper.updateSketch.get.update(value.asInstanceOf[Long])
        case StringType => wrapper.updateSketch.get.update(value.asInstanceOf[UTF8String].toString)
        case _ => throw new IllegalArgumentException(
          s"Unsupported input type ${inputExpr.dataType.catalogString}")
      }
    }
    wrapper
  }