in streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala [46:161]
def getSource[T: TypeInformation](
ctx: StreamingContext,
property: Properties = new Properties(),
topic: io.Serializable,
alias: String,
deserializer: KafkaDeserializationSchema[T],
strategy: WatermarkStrategy[KafkaRecord[T]]): FlinkKafkaConsumer[KafkaRecord[T]] = {
val prop = ConfigUtils.getConf(ctx.parameter.toMap, KAFKA_SOURCE_PREFIX + alias)
Utils.copyProperties(property, prop)
require(
prop != null && prop.nonEmpty && prop.exists(
x => x._1 == KEY_KAFKA_TOPIC || x._1 == KEY_KAFKA_PATTERN))
// start.form parameter...
val timestamp = Try(Some(prop(s"$KEY_KAFKA_START_FROM.$KEY_KAFKA_START_FROM_TIMESTAMP").toLong))
.getOrElse(None)
val startFrom = StartFrom.startForm(prop)
require(
!(timestamp.nonEmpty && startFrom != null),
s"[StreamPark] start.form timestamp and offset cannot be defined at the same time")
// topic parameter
val topicOpt = Try(Some(prop.remove(KEY_KAFKA_TOPIC).toString)).getOrElse(None)
val regexOpt = Try(Some(prop.remove(KEY_KAFKA_PATTERN).toString)).getOrElse(None)
val kfkDeserializer = new KafkaDeserializer[T](deserializer)
val consumer = (topicOpt, regexOpt) match {
case (Some(_), Some(_)) =>
throw new IllegalArgumentException(
"[StreamPark] topic and regex cannot be defined at the same time")
case (Some(top), _) =>
val topics = top.split(",|\\s+")
val topicList = topic match {
case null => topics.toList
case x: String => List(x)
case x: Array[String] => x.toList
case x: List[String] => x
case _ =>
throw new IllegalArgumentException(
"[StreamPark] topic type must be String(one topic) or List[String](more topic)")
}
new FlinkKafkaConsumer(topicList, kfkDeserializer, prop)
case (_, Some(reg)) =>
val pattern: Pattern = topic match {
case null => reg.r.pattern
case x: String => x.r.pattern
case _ =>
throw new IllegalArgumentException(
"[StreamPark] subscriptionPattern type must be String(regex)")
}
val kfkDeserializer = new KafkaDeserializer[T](deserializer)
new FlinkKafkaConsumer(pattern, kfkDeserializer, prop)
case _ => null
}
val autoCommit = prop.getOrElse(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true").toBoolean
(ctx.getCheckpointConfig.isCheckpointingEnabled, autoCommit) match {
case (true, _) => consumer.setCommitOffsetsOnCheckpoints(true)
case (_, false) =>
throw new IllegalArgumentException(
"[StreamPark] error:flink checkpoint was disable,and kafka autoCommit was false.you can enable checkpoint or enable kafka autoCommit...")
case _ =>
}
if (strategy != null) {
val assignerWithPeriodicWatermarks =
consumer.getClass.getMethod("assignTimestampsAndWatermarks", classOf[WatermarkStrategy[T]])
assignerWithPeriodicWatermarks.setAccessible(true)
assignerWithPeriodicWatermarks.invoke(consumer, strategy)
}
(timestamp, startFrom) match {
// Globally set Timestamp, effective for all topics.
case (Some(t), _) => consumer.setStartFromTimestamp(t)
// Specify the offset for each topic and partition
case _ =>
val startFroms = (topicOpt, regexOpt) match {
case (Some(top), _) =>
topic match {
case null => startFrom.toList
case x: String => startFrom.filter(_.topic == x).toList
case x: Array[_] =>
val topics = if (topic == null) top.split(",|\\s+").toList else x.toList
startFrom.filter(s => topics.contains(s.topic)).toList
case x: List[_] =>
val topics = if (topic == null) top.split(",|\\s+").toList else x
startFrom.filter(s => topics.contains(s.topic)).toList
case _ => List.empty[StartFrom]
}
case (_, Some(reg)) =>
topic match {
case null => startFrom.filter(s => reg.r.findFirstIn(s.topic).nonEmpty).toList
case x: String => startFrom.filter(s => x.r.findFirstIn(s.topic).nonEmpty).toList
case _ => List.empty[StartFrom]
}
case _ => null
}
// startOffsets...
val startOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()
startFroms
.filter(x => x != null && x.partitionOffset != null)
.foreach(
start => {
start.partitionOffset.foreach(
x => startOffsets.put(new KafkaTopicPartition(start.topic, x._1), x._2))
})
if (startOffsets.nonEmpty) {
consumer.setStartFromSpecificOffsets(startOffsets)
}
}
consumer
}