def bulkLoadThinRows[T]()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [746:878]


  def bulkLoadThinRows[T](rdd:RDD[T],
                  tableName: TableName,
                  mapFunction: (T) =>
                    (ByteArrayWrapper, FamiliesQualifiersValues),
                  stagingDir:String,
                  familyHFileWriteOptionsMap:
                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
                  compactionExclude: Boolean = false,
                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
  Unit = {
    val stagingPath = new Path(stagingDir)
    val fs = stagingPath.getFileSystem(config)
    if (fs.exists(stagingPath)) {
      throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
    }
    val conn = HBaseConnectionCache.getConnection(config)
    try {
      val regionLocator = conn.getRegionLocator(tableName)
      val startKeys = regionLocator.getStartKeys
      if (startKeys.length == 0) {
        logInfo("Table " + tableName.toString + " was not found")
      }
      val defaultCompressionStr = config.get("hfile.compression",
        Compression.Algorithm.NONE.getName)
      val defaultCompression = HFileWriterImpl
        .compressionByName(defaultCompressionStr)
      val nowTimeStamp = System.currentTimeMillis()
      val tableRawName = tableName.getName

      val familyHFileWriteOptionsMapInternal =
        new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]

      val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()

      while (entrySetIt.hasNext) {
        val entry = entrySetIt.next()
        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
      }

      val regionSplitPartitioner =
        new BulkLoadPartitioner(startKeys)

      //This is where all the magic happens
      //Here we are going to do the following things
      // 1. FlapMap every row in the RDD into key column value tuples
      // 2. Then we are going to repartition sort and shuffle
      // 3. Finally we are going to write out our HFiles
      rdd.map( r => mapFunction(r)).
        repartitionAndSortWithinPartitions(regionSplitPartitioner).
        hbaseForeachPartition(this, (it, conn) => {

          val conf = broadcastedConf.value.value
          val fs = new Path(stagingDir).getFileSystem(conf)
          val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
          var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
          var rollOverRequested = false
          val localTableName = TableName.valueOf(tableRawName)

          //Here is where we finally iterate through the data in this partition of the
          //RDD that has been sorted and partitioned
          it.foreach{ case (rowKey:ByteArrayWrapper,
          familiesQualifiersValues:FamiliesQualifiersValues) =>


            if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
              throw new KeyAlreadyExistsException("The following key was sent to the " +
                "HFile load more then one: " + Bytes.toString(previousRow))
            }

            //The family map is a tree map so the families will be sorted
            val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
            while (familyIt.hasNext) {
              val familyEntry = familyIt.next()

              val family = familyEntry.getKey.value

              val qualifierIt = familyEntry.getValue.entrySet().iterator()

              //The qualifier map is a tree map so the families will be sorted
              while (qualifierIt.hasNext) {

                val qualifierEntry = qualifierIt.next()
                val qualifier = qualifierEntry.getKey
                val cellValue = qualifierEntry.getValue

                writeValueToHFile(rowKey.value,
                  family,
                  qualifier.value, // qualifier
                  cellValue, // value
                  nowTimeStamp,
                  fs,
                  conn,
                  localTableName,
                  conf,
                  familyHFileWriteOptionsMapInternal,
                  defaultCompression,
                  writerMap,
                  stagingDir)

                previousRow = rowKey.value
              }

              writerMap.values.foreach( wl => {
                rollOverRequested = rollOverRequested || wl.written > maxSize

                //This will only roll if we have at least one column family file that is
                //bigger then maxSize and we have finished a given row key
                if (rollOverRequested) {
                  rollWriters(fs, writerMap,
                    regionSplitPartitioner,
                    previousRow,
                    compactionExclude)
                  rollOverRequested = false
                }
              })
            }
          }

          //This will get a writer for the column family
          //If there is no writer for a given column family then
          //it will get created here.
          //We have finished all the data so lets close up the writers
          rollWriters(fs, writerMap,
            regionSplitPartitioner,
            previousRow,
            compactionExclude)
          rollOverRequested = false
        })
    } finally {
      if(null != conn) conn.close()
    }
  }