override def handle()

in quickstart-flink/quickstart-datastream/datastream_1.16/src/main/scala/org/apache/streampark/flink/quickstart/datastream/QuickStartApp.scala [39:51]


  override def handle(): Unit = {
    //1) 从 KAFKA 中读取数据,并过滤
    val source = KafkaSource()
      .getDataStream[String]()
      .map(x => JsonUtils.read[User](x.value))
      .filter(_.age < 30)

    //2) 写入到 Mysql 中
    JdbcSink().sink[User](source)(user => {
      s"insert into t_user(`name`,`age`) value('${user.name}',${user.age})"
    })

  }