in quickstart-flink/quickstart-datastream/datastream_1.16/src/main/scala/org/apache/streampark/flink/quickstart/datastream/QuickStartApp.scala [39:51]
override def handle(): Unit = {
//1) 从 KAFKA 中读取数据,并过滤
val source = KafkaSource()
.getDataStream[String]()
.map(x => JsonUtils.read[User](x.value))
.filter(_.age < 30)
//2) 写入到 Mysql 中
JdbcSink().sink[User](source)(user => {
s"insert into t_user(`name`,`age`) value('${user.name}',${user.age})"
})
}