override def handle()

in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala [32:42]


  override def handle(): Unit = {
    val source: DataStream[Entity] = context.addSource(new MyDataSource)

    implicit def indexedSeq(x: Entity): IndexRequest = ElasticsearchUtils.indexRequest(
        "flink_order",
        s"${x.orderId}_${x.timestamp}",
        JsonMethods.mapper.writeValueAsString(x)
    )

    ES7Sink().sink[Entity](source)
  }