in quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala [35:71]
override def handle(): Unit = {
implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
val id = HBaseSource().getDataStream[String](query => {
Thread.sleep(10)
if (query == null) {
new HBaseQuery("person", new Scan())
} else {
//TODO 从上一条记录中获取便宜量,决定下次查询的条件...
new HBaseQuery("person", new Scan())
}
}, r => new String(r.getRow), null)
HBaseRequest(id).requestOrdered(x => {
new HBaseQuery("person", new Get(x.getBytes()))
}, (a, r) => {
val map = new util.HashMap[String, String]()
val cellScanner = r.cellScanner()
while (cellScanner.advance()) {
val cell = cellScanner.current()
val q = Bytes.toString(CellUtil.cloneQualifier(cell))
val (name, v) = q.split("_") match {
case Array(_type, name) =>
_type match {
case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
}
case _ =>
}
map.put(name.toString, v.toString)
}
map.toString
}).print("Async")
}