in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala [32:42]
override def handle(): Unit = {
val source: DataStream[Entity] = context.addSource(new MyDataSource)
implicit def indexedSeq(x: Entity): IndexRequest = ElasticsearchUtils.indexRequest(
"flink_order",
s"${x.orderId}_${x.timestamp}",
JsonMethods.mapper.writeValueAsString(x)
)
ES7Sink().sink[Entity](source)
}