def main()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/example/datasources/AvroSource.scala [102:167]


  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("AvroSourceExample")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    import sqlContext.implicits._

    def withCatalog(cat: String): DataFrame = {
      sqlContext.read
        .options(
          Map(
            "avroSchema" -> AvroHBaseRecord.schemaString,
            HBaseTableCatalog.tableCatalog -> avroCatalog))
        .format("org.apache.hadoop.hbase.spark")
        .load()
    }

    val data = (0 to 255).map { i => AvroHBaseRecord(i) }

    sc.parallelize(data)
      .toDF
      .write
      .options(Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
      .format("org.apache.hadoop.hbase.spark")
      .save()

    val df = withCatalog(catalog)
    df.show()
    df.printSchema()
    df.registerTempTable("ExampleAvrotable")
    val c = sqlContext.sql("select count(1) from ExampleAvrotable")
    c.show()

    val filtered = df.select($"col0", $"col1.favorite_array").where($"col0" === "name001")
    filtered.show()
    val collected = filtered.collect()
    if (collected(0).getSeq[String](1)(0) != "number1") {
      throw new UserCustomizedSampleException("value invalid")
    }
    if (collected(0).getSeq[String](1)(1) != "number2") {
      throw new UserCustomizedSampleException("value invalid")
    }

    df.write
      .options(
        Map(
          "avroSchema" -> AvroHBaseRecord.schemaString,
          HBaseTableCatalog.tableCatalog -> avroCatalogInsert,
          HBaseTableCatalog.newTable -> "5"))
      .format("org.apache.hadoop.hbase.spark")
      .save()
    val newDF = withCatalog(avroCatalogInsert)
    newDF.show()
    newDF.printSchema()
    if (newDF.count() != 256) {
      throw new UserCustomizedSampleException("value invalid")
    }

    df.filter($"col1.name" === "name005" || $"col1.name" <= "name005")
      .select("col0", "col1.favorite_color", "col1.favorite_number")
      .show()

    df.filter($"col1.name" <= "name005" || $"col1.name".contains("name007"))
      .select("col0", "col1.favorite_color", "col1.favorite_number")
      .show()
  }