override def handle()

in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala [35:61]


  override def handle(): Unit = {
    val source = context.addSource(new MyDataSource)
    val random = new Random()

    //定义转换规则...
    implicit def entry2Put(entity: Entity): java.lang.Iterable[Mutation] = {
      val put = new Put(Bytes.toBytes(System.nanoTime() + random.nextInt(1000000)), entity.timestamp)
      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"), Bytes.toBytes(entity.cityId))
      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"), Bytes.toBytes(entity.orderId))
      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"), Bytes.toBytes(entity.orderStatus))
      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oq"), Bytes.toBytes(entity.quantity))
      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"), Bytes.toBytes(entity.siteId))
      Collections.singleton(put)
    }
    //source ===> trans ===> sink

    //1)插入方式1
    HBaseSink().sink[Entity](source, "order")

    //2) 插入方式2
    //1.指定HBase 配置文件
    val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
    //2.插入...
    source.writeUsingOutputFormat(new HBaseOutputFormat[Entity]("order", prop))


  }