override def handle()

in quickstart-flink/quickstart-datastream/datastream_1.14/src/main/scala/org/apache/streampark/flink/quickstart/datastream/QuickStartApp.scala [54:66]


  override def handle(): Unit = {
    val source = KafkaSource()
      .getDataStream[String]()
      .map(x => JsonUtils.read[User](x.value))
      .filter(_.age < 30)

    JdbcSink().sink[User](source)(user =>
      s"""
         |insert into t_user(`name`,`age`,`gender`,`address`)
         |value('${user.name}',${user.age},${user.gender},'${user.address}')
         |""".stripMargin
    )
  }