override def handle()

in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala [32:50]


  override def handle(): Unit = {
    implicit val prop: Properties = context.parameter.getProperties
    val source = MongoSource()
    source.getDataStream[String](
      "shop",
      (a, d) => {
        Thread.sleep(1000)
        /**
         * 从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset
         */
        val offset = if (a == null) "2019-09-27 00:00:00" else {
          JsonUtils.read[Map[String, _]](a).get("updateTime").toString
        }
        val cond = new BasicDBObject().append("updateTime", new BasicDBObject("$gte", DateUtils.parse(offset)))
        d.find(cond)
      },
      _.toList.map(_.toJson()), null
    ).print()
  }