override def insert()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala [211:257]


  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
    val jobConfig: JobConf = new JobConf(hbaseConf, this.getClass)
    jobConfig.setOutputFormat(classOf[TableOutputFormat])
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName)
    var count = 0
    val rkFields = catalog.getRowKey
    val rkIdxedFields = rkFields.map {
      case x =>
        (schema.fieldIndex(x.colName), x)
    }
    val colsIdxedFields = schema.fieldNames
      .partition(x => rkFields.map(_.colName).contains(x))
      ._2
      .map(x => (schema.fieldIndex(x), catalog.getField(x)))
    val rdd = data.rdd
    def convertToPut(row: Row) = {
      // construct bytes for row key
      val rowBytes = rkIdxedFields.map {
        case (x, y) =>
          Utils.toBytes(row(x), y)
      }
      val rLen = rowBytes.foldLeft(0) {
        case (x, y) =>
          x + y.length
      }
      val rBytes = new Array[Byte](rLen)
      var offset = 0
      rowBytes.foreach {
        x =>
          System.arraycopy(x, 0, rBytes, offset, x.length)
          offset += x.length
      }
      val put = timestamp.fold(new Put(rBytes))(new Put(rBytes, _))

      colsIdxedFields.foreach {
        case (x, y) =>
          val r = row(x)
          if (r != null) {
            val b = Utils.toBytes(r, y)
            put.addColumn(Bytes.toBytes(y.cf), Bytes.toBytes(y.col), b)
          }
      }
      count += 1
      (new ImmutableBytesWritable, put)
    }
    rdd.map(convertToPut(_)).saveAsHadoopDataset(jobConfig)
  }