override def cancel()

in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala [29:51]


  override def cancel(): Unit = this.isRunning = false

  val random = new Random()

  var index = 0

  override def run(ctx: SourceFunction.SourceContext[Entity]): Unit = {
    while (isRunning && index <= 1000001) {
      index += 1

      val entity = new Entity()
      entity.userId = System.currentTimeMillis()
      entity.orderId = random.nextInt(100)
      entity.orderStatus = random.nextInt(1)
      entity.price = random.nextDouble()
      entity.quantity = new Random().nextInt(10)
      entity.cityId = 1
      entity.siteId = random.nextInt(20)
      entity.timestamp = System.currentTimeMillis()

      ctx.collect(entity)
    }
  }