in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala [64:80]
override def cancel(): Unit = this.isRunning = false
val random = new Random()
var index = 0
override def run(ctx: SourceFunction.SourceContext[Behavior]): Unit = {
val seq = Seq("view", "click", "search", "buy", "share")
while (isRunning && index <= 10000) {
index += 1
val user_id = random.nextInt(1000)
val item_id = random.nextInt(100)
val category_id = random.nextInt(20)
val behavior = seq(random.nextInt(5))
val order = Behavior(user_id.toString, item_id, category_id, behavior, System.currentTimeMillis())
ctx.collect(order)
}
}