private def getCustomConnector()

in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala [105:137]


  private def getCustomConnector(
      sparkSession: SparkSession,
      ssc: StreamingContext,
      dcParam: DataConnectorParam,
      timestampStorage: TimestampStorage,
      streamingCacheClientOpt: Option[StreamingCacheClient]): DataConnector = {
    val className = dcParam.getConfig("class").asInstanceOf[String]
    val cls = Class.forName(className)
    if (classOf[BatchDataConnector].isAssignableFrom(cls)) {
      val method = cls.getDeclaredMethod(
        "apply",
        classOf[SparkSession],
        classOf[DataConnectorParam],
        classOf[TimestampStorage])
      method
        .invoke(null, sparkSession, dcParam, timestampStorage)
        .asInstanceOf[BatchDataConnector]
    } else if (classOf[StreamingDataConnector].isAssignableFrom(cls)) {
      val method = cls.getDeclaredMethod(
        "apply",
        classOf[SparkSession],
        classOf[StreamingContext],
        classOf[DataConnectorParam],
        classOf[TimestampStorage],
        classOf[Option[StreamingCacheClient]])
      method
        .invoke(null, sparkSession, ssc, dcParam, timestampStorage, streamingCacheClientOpt)
        .asInstanceOf[StreamingDataConnector]
    } else {
      throw new ClassCastException(
        s"$className should extend BatchDataConnector or StreamingDataConnector")
    }
  }