in measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala [61:88]
def getDataConnector(
sparkSession: SparkSession,
ssc: StreamingContext,
dcParam: DataConnectorParam,
tmstCache: TimestampStorage,
streamingCacheClientOpt: Option[StreamingCacheClient]): Try[DataConnector] = {
val conType = dcParam.getType
Try {
conType match {
case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
case FileRegex() => FileBasedDataConnector(sparkSession, dcParam, tmstCache)
case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
case ElasticSearchRegex() => ElasticSearchDataConnector(sparkSession, dcParam, tmstCache)
case CustomRegex() =>
getCustomConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case KafkaRegex() =>
getStreamingDataConnector(
sparkSession,
ssc,
dcParam,
tmstCache,
streamingCacheClientOpt)
case JDBCRegex() => JDBCBasedDataConnector(sparkSession, dcParam, tmstCache)
case _ => throw new Exception("connector creation error!")
}
}
}