def getDataConnector()

in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala [61:88]


  def getDataConnector(
      sparkSession: SparkSession,
      ssc: StreamingContext,
      dcParam: DataConnectorParam,
      tmstCache: TimestampStorage,
      streamingCacheClientOpt: Option[StreamingCacheClient]): Try[DataConnector] = {
    val conType = dcParam.getType
    Try {
      conType match {
        case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
        case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
        case FileRegex() => FileBasedDataConnector(sparkSession, dcParam, tmstCache)
        case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
        case ElasticSearchRegex() => ElasticSearchDataConnector(sparkSession, dcParam, tmstCache)
        case CustomRegex() =>
          getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
        case KafkaRegex() =>
          getStreamingDataConnector(
            sparkSession,
            ssc,
            dcParam,
            tmstCache,
            streamingCacheClientOpt)
        case JDBCRegex() => JDBCBasedDataConnector(sparkSession, dcParam, tmstCache)
        case _ => throw new Exception("connector creation error!")
      }
    }
  }