in DataProcessing/datax-host/src/main/scala/datax/host/SparkJarLoader.scala [82:134]
def registerJavaUDF(udfReg: UDFRegistration, name: String, className: String, returnDataType: DataType): Unit = {
try {
val clazz = ClassLoaderHost.classForName(className)
val udfInterfaces = clazz.getGenericInterfaces
.filter(_.isInstanceOf[ParameterizedType])
.map(_.asInstanceOf[ParameterizedType])
.filter(e => e.getRawType.isInstanceOf[Class[_]] && e.getRawType.asInstanceOf[Class[_]].getCanonicalName.startsWith("org.apache.spark.sql.api.java.UDF"))
if (udfInterfaces.length == 0) {
throw new EngineException(s"UDF class $className doesn't implement any UDF interface")
} else if (udfInterfaces.length > 1) {
throw new EngineException(s"It is invalid to implement multiple UDF interfaces, UDF class $className")
} else {
try {
val udf = clazz.newInstance()
//val udfReturnType = udfInterfaces(0).getActualTypeArguments.last
val returnType = if(returnDataType==null) getJavaUDFReturnDataType(clazz) else returnDataType
udfInterfaces(0).getActualTypeArguments.length match {
case 1 => udfReg.register(name, udf.asInstanceOf[UDF0[_]], returnType)
case 2 => udfReg.register(name, udf.asInstanceOf[UDF1[_, _]], returnType)
case 3 => udfReg.register(name, udf.asInstanceOf[UDF2[_, _, _]], returnType)
case 4 => udfReg.register(name, udf.asInstanceOf[UDF3[_, _, _, _]], returnType)
case 5 => udfReg.register(name, udf.asInstanceOf[UDF4[_, _, _, _, _]], returnType)
case 6 => udfReg.register(name, udf.asInstanceOf[UDF5[_, _, _, _, _, _]], returnType)
case 7 => udfReg.register(name, udf.asInstanceOf[UDF6[_, _, _, _, _, _, _]], returnType)
case 8 => udfReg.register(name, udf.asInstanceOf[UDF7[_, _, _, _, _, _, _, _]], returnType)
case 9 => udfReg.register(name, udf.asInstanceOf[UDF8[_, _, _, _, _, _, _, _, _]], returnType)
case 10 => udfReg.register(name, udf.asInstanceOf[UDF9[_, _, _, _, _, _, _, _, _, _]], returnType)
case 11 => udfReg.register(name, udf.asInstanceOf[UDF10[_, _, _, _, _, _, _, _, _, _, _]], returnType)
case 12 => udfReg.register(name, udf.asInstanceOf[UDF11[_, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 13 => udfReg.register(name, udf.asInstanceOf[UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 14 => udfReg.register(name, udf.asInstanceOf[UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 15 => udfReg.register(name, udf.asInstanceOf[UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 16 => udfReg.register(name, udf.asInstanceOf[UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 17 => udfReg.register(name, udf.asInstanceOf[UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 18 => udfReg.register(name, udf.asInstanceOf[UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 19 => udfReg.register(name, udf.asInstanceOf[UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 20 => udfReg.register(name, udf.asInstanceOf[UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 21 => udfReg.register(name, udf.asInstanceOf[UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 22 => udfReg.register(name, udf.asInstanceOf[UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case 23 => udfReg.register(name, udf.asInstanceOf[UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _]], returnType)
case n =>
throw new EngineException(s"UDF class with $n type arguments is not supported.")
}
} catch {
case e @ (_: InstantiationException | _: IllegalArgumentException) =>
throw new EngineException(s"Can not instantiate class $className, please make sure it has public non argument constructor")
}
}
} catch {
case e: ClassNotFoundException => throw new EngineException(s"Can not load class $className, please make sure it is on the classpath")
}
}