def registerJavaUDF()

in DataProcessing/datax-host/src/main/scala/datax/host/SparkJarLoader.scala [82:134]


  def registerJavaUDF(udfReg: UDFRegistration, name: String, className: String, returnDataType: DataType): Unit = {
    try {
      val clazz = ClassLoaderHost.classForName(className)
      val udfInterfaces = clazz.getGenericInterfaces
        .filter(_.isInstanceOf[ParameterizedType])
        .map(_.asInstanceOf[ParameterizedType])
        .filter(e => e.getRawType.isInstanceOf[Class[_]] && e.getRawType.asInstanceOf[Class[_]].getCanonicalName.startsWith("org.apache.spark.sql.api.java.UDF"))
      if (udfInterfaces.length == 0) {
        throw new EngineException(s"UDF class $className doesn't implement any UDF interface")
      } else if (udfInterfaces.length > 1) {
        throw new EngineException(s"It is invalid to implement multiple UDF interfaces, UDF class $className")
      } else {
        try {
          val udf = clazz.newInstance()
          //val udfReturnType = udfInterfaces(0).getActualTypeArguments.last
          val returnType = if(returnDataType==null) getJavaUDFReturnDataType(clazz) else returnDataType

          udfInterfaces(0).getActualTypeArguments.length match {
            case 1 => udfReg.register(name, udf.asInstanceOf[UDF0[_]], returnType)
            case 2 => udfReg.register(name, udf.asInstanceOf[UDF1[_, _]], returnType)
            case 3 => udfReg.register(name, udf.asInstanceOf[UDF2[_, _, _]], returnType)
            case 4 => udfReg.register(name, udf.asInstanceOf[UDF3[_, _, _, _]], returnType)
            case 5 => udfReg.register(name, udf.asInstanceOf[UDF4[_, _, _, _, _]], returnType)
            case 6 => udfReg.register(name, udf.asInstanceOf[UDF5[_, _, _, _, _, _]], returnType)
            case 7 => udfReg.register(name, udf.asInstanceOf[UDF6[_, _, _, _, _, _, _]], returnType)
            case 8 => udfReg.register(name, udf.asInstanceOf[UDF7[_, _, _, _, _, _, _, _]], returnType)
            case 9 => udfReg.register(name, udf.asInstanceOf[UDF8[_, _, _, _, _, _, _, _, _]], returnType)
            case 10 => udfReg.register(name, udf.asInstanceOf[UDF9[_, _, _, _, _, _, _, _, _, _]], returnType)
            case 11 => udfReg.register(name, udf.asInstanceOf[UDF10[_, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 12 => udfReg.register(name, udf.asInstanceOf[UDF11[_, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 13 => udfReg.register(name, udf.asInstanceOf[UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 14 => udfReg.register(name, udf.asInstanceOf[UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 15 => udfReg.register(name, udf.asInstanceOf[UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 16 => udfReg.register(name, udf.asInstanceOf[UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 17 => udfReg.register(name, udf.asInstanceOf[UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 18 => udfReg.register(name, udf.asInstanceOf[UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 19 => udfReg.register(name, udf.asInstanceOf[UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 20 => udfReg.register(name, udf.asInstanceOf[UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 21 => udfReg.register(name, udf.asInstanceOf[UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 22 => udfReg.register(name, udf.asInstanceOf[UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case 23 => udfReg.register(name, udf.asInstanceOf[UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
            case n =>
              throw new EngineException(s"UDF class with $n type arguments is not supported.")
          }
        } catch {
          case e @ (_: InstantiationException | _: IllegalArgumentException) =>
            throw new EngineException(s"Can not instantiate class $className, please make sure it has public non argument constructor")
        }
      }
    } catch {
      case e: ClassNotFoundException => throw new EngineException(s"Can not load class $className, please make sure it is on the classpath")
    }
  }