in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala [35:57]
override def handle(): Unit = {
val source = context.addSource(new SideSource())
/**
* 侧输出流。。。
* 官方写法:设置侧输出流
*/
val side1 = source.process(new ProcessFunction[SideEntry, SideEntry] {
val tag = new OutputTag[SideEntry]("flink")
override def processElement(value: SideEntry, ctx: ProcessFunction[SideEntry, SideEntry]#Context, out: Collector[SideEntry]): Unit = {
if (value.userId < 100) {
ctx.output(tag, value)
} else {
out.collect(value)
}
}
})
//官方写法,获取侧输出流
side1.getSideOutput(new OutputTag[SideEntry]("flink")).print("flink:========>")
}