in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala [32:50]
override def handle(): Unit = {
implicit val prop: Properties = context.parameter.getProperties
val source = MongoSource()
source.getDataStream[String](
"shop",
(a, d) => {
Thread.sleep(1000)
/**
* 从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset
*/
val offset = if (a == null) "2019-09-27 00:00:00" else {
JsonUtils.read[Map[String, _]](a).get("updateTime").toString
}
val cond = new BasicDBObject().append("updateTime", new BasicDBObject("$gte", DateUtils.parse(offset)))
d.find(cond)
},
_.toList.map(_.toJson()), null
).print()
}