private def getDataSource()

in measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala [42:79]


  private def getDataSource(
      sparkSession: SparkSession,
      ssc: StreamingContext,
      dataSourceParam: DataSourceParam,
      index: Int): Option[DataSource] = {
    val name = dataSourceParam.getName
    val timestampStorage = TimestampStorage()

    // for streaming data cache
    val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
      sparkSession,
      dataSourceParam.getCheckpointOpt,
      name,
      index,
      timestampStorage)

    val connectorParamsOpt = dataSourceParam.getConnector

    connectorParamsOpt match {
      case Some(connectorParam) =>
        val dataConnectors = DataConnectorFactory.getDataConnector(
          sparkSession,
          ssc,
          connectorParam,
          timestampStorage,
          streamingCacheClientOpt) match {
          case Success(connector) => Some(connector)
          case Failure(exception) =>
            error("Error initializing data source with name '$name'.", exception)
            throw exception
        }

        Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
      case None =>
        error(s"Connector for data source with name '$name' is invalid.")
        throw new IllegalStateException()
    }
  }