def getSource[T: TypeInformation]()

in streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala [46:161]


  def getSource[T: TypeInformation](
      ctx: StreamingContext,
      property: Properties = new Properties(),
      topic: io.Serializable,
      alias: String,
      deserializer: KafkaDeserializationSchema[T],
      strategy: WatermarkStrategy[KafkaRecord[T]]): FlinkKafkaConsumer[KafkaRecord[T]] = {

    val prop = ConfigUtils.getConf(ctx.parameter.toMap, KAFKA_SOURCE_PREFIX + alias)
    Utils.copyProperties(property, prop)
    require(
      prop != null && prop.nonEmpty && prop.exists(
        x => x._1 == KEY_KAFKA_TOPIC || x._1 == KEY_KAFKA_PATTERN))

    // start.form parameter...
    val timestamp = Try(Some(prop(s"$KEY_KAFKA_START_FROM.$KEY_KAFKA_START_FROM_TIMESTAMP").toLong))
      .getOrElse(None)
    val startFrom = StartFrom.startForm(prop)
    require(
      !(timestamp.nonEmpty && startFrom != null),
      s"[StreamPark] start.form timestamp and offset cannot be defined at the same time")

    // topic parameter
    val topicOpt = Try(Some(prop.remove(KEY_KAFKA_TOPIC).toString)).getOrElse(None)
    val regexOpt = Try(Some(prop.remove(KEY_KAFKA_PATTERN).toString)).getOrElse(None)

    val kfkDeserializer = new KafkaDeserializer[T](deserializer)

    val consumer = (topicOpt, regexOpt) match {
      case (Some(_), Some(_)) =>
        throw new IllegalArgumentException(
          "[StreamPark] topic and regex cannot be defined at the same time")
      case (Some(top), _) =>
        val topics = top.split(",|\\s+")
        val topicList = topic match {
          case null => topics.toList
          case x: String => List(x)
          case x: Array[String] => x.toList
          case x: List[String] => x
          case _ =>
            throw new IllegalArgumentException(
              "[StreamPark] topic type must be String(one topic) or List[String](more topic)")
        }
        new FlinkKafkaConsumer(topicList, kfkDeserializer, prop)
      case (_, Some(reg)) =>
        val pattern: Pattern = topic match {
          case null => reg.r.pattern
          case x: String => x.r.pattern
          case _ =>
            throw new IllegalArgumentException(
              "[StreamPark] subscriptionPattern type must be String(regex)")
        }
        val kfkDeserializer = new KafkaDeserializer[T](deserializer)
        new FlinkKafkaConsumer(pattern, kfkDeserializer, prop)
      case _ => null
    }

    val autoCommit = prop.getOrElse(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true").toBoolean
    (ctx.getCheckpointConfig.isCheckpointingEnabled, autoCommit) match {
      case (true, _) => consumer.setCommitOffsetsOnCheckpoints(true)
      case (_, false) =>
        throw new IllegalArgumentException(
          "[StreamPark] error:flink checkpoint was disable,and kafka autoCommit was false.you can enable checkpoint or enable kafka autoCommit...")
      case _ =>
    }

    if (strategy != null) {
      val assignerWithPeriodicWatermarks =
        consumer.getClass.getMethod("assignTimestampsAndWatermarks", classOf[WatermarkStrategy[T]])
      assignerWithPeriodicWatermarks.setAccessible(true)
      assignerWithPeriodicWatermarks.invoke(consumer, strategy)
    }

    (timestamp, startFrom) match {
      // Globally set Timestamp, effective for all topics.
      case (Some(t), _) => consumer.setStartFromTimestamp(t)
      // Specify the offset for each topic and partition
      case _ =>
        val startFroms = (topicOpt, regexOpt) match {
          case (Some(top), _) =>
            topic match {
              case null => startFrom.toList
              case x: String => startFrom.filter(_.topic == x).toList
              case x: Array[_] =>
                val topics = if (topic == null) top.split(",|\\s+").toList else x.toList
                startFrom.filter(s => topics.contains(s.topic)).toList
              case x: List[_] =>
                val topics = if (topic == null) top.split(",|\\s+").toList else x
                startFrom.filter(s => topics.contains(s.topic)).toList
              case _ => List.empty[StartFrom]
            }
          case (_, Some(reg)) =>
            topic match {
              case null => startFrom.filter(s => reg.r.findFirstIn(s.topic).nonEmpty).toList
              case x: String => startFrom.filter(s => x.r.findFirstIn(s.topic).nonEmpty).toList
              case _ => List.empty[StartFrom]
            }
          case _ => null
        }

        // startOffsets...
        val startOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
        startFroms
          .filter(x => x != null && x.partitionOffset != null)
          .foreach(
            start => {
              start.partitionOffset.foreach(
                x => startOffsets.put(new KafkaTopicPartition(start.topic, x._1), x._2))
            })

        if (startOffsets.nonEmpty) {
          consumer.setStartFromSpecificOffsets(startOffsets)
        }
    }
    consumer
  }