override def cancel()

in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala [64:80]


  override def cancel(): Unit = this.isRunning = false

  val random = new Random()
  var index = 0

  override def run(ctx: SourceFunction.SourceContext[Behavior]): Unit = {
    val seq = Seq("view", "click", "search", "buy", "share")
    while (isRunning && index <= 10000) {
      index += 1
      val user_id = random.nextInt(1000)
      val item_id = random.nextInt(100)
      val category_id = random.nextInt(20)
      val behavior = seq(random.nextInt(5))
      val order = Behavior(user_id.toString, item_id, category_id, behavior, System.currentTimeMillis())
      ctx.collect(order)
    }
  }