in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala [27:36]
override def handle(): Unit = {
JdbcSource().getDataStream[Order](lastOne => {
val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else lastOne.timestamp
s"select * from t_order where timestamp > '$laseOffset' order by timestamp asc "
},
_.map(x => new Order(x("market_id").toString, x("timestamp").toString)), null
).print()
}