in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala [102:167]
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("AvroSourceExample")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
def withCatalog(cat: String): DataFrame = {
sqlContext.read
.options(
Map(
"avroSchema" -> AvroHBaseRecord.schemaString,
HBaseTableCatalog.tableCatalog -> avroCatalog))
.format("org.apache.hadoop.hbase.spark")
.load()
}
val data = (0 to 255).map { i => AvroHBaseRecord(i) }
sc.parallelize(data)
.toDF
.write
.options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
val df = withCatalog(catalog)
df.show()
df.printSchema()
df.registerTempTable("ExampleAvrotable")
val c = sqlContext.sql("select count(1) from ExampleAvrotable")
c.show()
val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001")
filtered.show()
val collected = filtered.collect()
if (collected(0).getSeq[String](1)(0) != "number1") {
throw new UserCustomizedSampleException("value invalid")
}
if (collected(0).getSeq[String](1)(1) != "number2") {
throw new UserCustomizedSampleException("value invalid")
}
df.write
.options(
Map(
"avroSchema" -> AvroHBaseRecord.schemaString,
HBaseTableCatalog.tableCatalog -> avroCatalogInsert,
HBaseTableCatalog.newTable -> "5"))
.format("org.apache.hadoop.hbase.spark")
.save()
val newDF = withCatalog(avroCatalogInsert)
newDF.show()
newDF.printSchema()
if (newDF.count() != 256) {
throw new UserCustomizedSampleException("value invalid")
}
df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
.select("col0", "col1.favorite_color", "col1.favorite_number")
.show()
df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007"))
.select("col0", "col1.favorite_color", "col1.favorite_number")
.show()
}