override def handle()

in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala [35:57]


  override def handle(): Unit = {
    val source = context.addSource(new SideSource())

    /**
     * 侧输出流。。。
     * 官方写法:设置侧输出流
     */
    val side1 = source.process(new ProcessFunction[SideEntry, SideEntry] {
      val tag = new OutputTag[SideEntry]("flink")

      override def processElement(value: SideEntry, ctx: ProcessFunction[SideEntry, SideEntry]#Context, out: Collector[SideEntry]): Unit = {
        if (value.userId < 100) {
          ctx.output(tag, value)
        } else {
          out.collect(value)
        }
      }
    })

    //官方写法,获取侧输出流
    side1.getSideOutput(new OutputTag[SideEntry]("flink")).print("flink:========>")

  }