in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala [35:61]
override def handle(): Unit = {
val source = context.addSource(new MyDataSource)
val random = new Random()
//定义转换规则...
implicit def entry2Put(entity: Entity): java.lang.Iterable[Mutation] = {
val put = new Put(Bytes.toBytes(System.nanoTime() + random.nextInt(1000000)), entity.timestamp)
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"), Bytes.toBytes(entity.cityId))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"), Bytes.toBytes(entity.orderId))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"), Bytes.toBytes(entity.orderStatus))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oq"), Bytes.toBytes(entity.quantity))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"), Bytes.toBytes(entity.siteId))
Collections.singleton(put)
}
//source ===> trans ===> sink
//1)插入方式1
HBaseSink().sink[Entity](source, "order")
//2) 插入方式2
//1.指定HBase 配置文件
val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
//2.插入...
source.writeUsingOutputFormat(new HBaseOutputFormat[Entity]("order", prop))
}