in src/main/scala/com/aliyun/emr/example/spark/streaming/SparkRocketMQDemo.scala [99:124]
def generate(
partitionId: Int,
accessKeyId: String,
accessKeySecret: String,
pId: String,
topic: String,
tag: String): Unit = {
val properties = new Properties()
properties.put(PropertyKeyConst.ProducerId, pId)
properties.put(PropertyKeyConst.AccessKey, accessKeyId)
properties.put(PropertyKeyConst.SecretKey, accessKeySecret)
val onsFactoryImpl = new ONSFactoryImpl
val producer = onsFactoryImpl.createProducer(properties)
producer.shutdown()
producer.start()
var count = 0
while(true){
val uuid = UUID.randomUUID()
val msg = new Message(topic, tag, uuid.toString.getBytes)
msg.setKey(s"ORDERID_${partitionId}_$count")
producer.send(msg)
count += 1
Thread.sleep(100L)
}
}