in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala [105:137]
private def getCustomConnector(
sparkSession: SparkSession,
ssc: StreamingContext,
dcParam: DataConnectorParam,
timestampStorage: TimestampStorage,
streamingCacheClientOpt: Option[StreamingCacheClient]): DataConnector = {
val className = dcParam.getConfig("class").asInstanceOf[String]
val cls = Class.forName(className)
if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
val method = cls.getDeclaredMethod(
"apply",
classOf[SparkSession],
classOf[DataConnectorParam],
classOf[TimestampStorage])
method
.invoke(null, sparkSession, dcParam, timestampStorage)
.asInstanceOf[BatchDataConnector]
} else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
val method = cls.getDeclaredMethod(
"apply",
classOf[SparkSession],
classOf[StreamingContext],
classOf[DataConnectorParam],
classOf[TimestampStorage],
classOf[Option[StreamingCacheClient]])
method
.invoke(null, sparkSession, ssc, dcParam, timestampStorage, streamingCacheClientOpt)
.asInstanceOf[StreamingDataConnector]
} else {
throw new ClassCastException(
s"$className should extend BatchDataConnector or StreamingDataConnector")
}
}