in quickstart-flink/quickstart-datastream/datastream_1.12/src/main/scala/org/apache/streampark/flink/quickstart/datastream/QuickStartApp.scala [30:42]
override def handle(): Unit = {
val source = KafkaSource()
.getDataStream[String]()
.map(x => JsonUtils.read[User](x.value))
.filter(_.age < 30)
JdbcSink().sink[User](source)(user =>
s"""
|insert into t_user(`name`,`age`,`gender`,`address`)
|value('${user.name}',${user.age},${user.gender},'${user.address}')
|""".stripMargin
)
}