def bulkLoadThinRows[T]()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [737:870]


  def bulkLoadThinRows[T](
      rdd: RDD[T],
      tableName: TableName,
      mapFunction: (T) => (ByteArrayWrapper, FamiliesQualifiersValues),
      stagingDir: String,
      familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] =
        new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
      compactionExclude: Boolean = false,
      maxSize: Long = HConstants.DEFAULT_MAX_FILE_SIZE): Unit = {
    val stagingPath = new Path(stagingDir)
    val fs = stagingPath.getFileSystem(config)
    if (fs.exists(stagingPath)) {
      throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
    }
    val conn = HBaseConnectionCache.getConnection(config)
    try {
      val regionLocator = conn.getRegionLocator(tableName)
      val startKeys = regionLocator.getStartKeys
      if (startKeys.length == 0) {
        logInfo("Table " + tableName.toString + " was not found")
      }
      val defaultCompressionStr =
        config.get("hfile.compression", Compression.Algorithm.NONE.getName)
      val defaultCompression = HFileWriterImpl
        .compressionByName(defaultCompressionStr)
      val nowTimeStamp = System.currentTimeMillis()
      val tableRawName = tableName.getName

      val familyHFileWriteOptionsMapInternal =
        new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]

      val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()

      while (entrySetIt.hasNext) {
        val entry = entrySetIt.next()
        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
      }

      val regionSplitPartitioner =
        new BulkLoadPartitioner(startKeys)

      // This is where all the magic happens
      // Here we are going to do the following things
      // 1. FlapMap every row in the RDD into key column value tuples
      // 2. Then we are going to repartition sort and shuffle
      // 3. Finally we are going to write out our HFiles
      rdd
        .map(r => mapFunction(r))
        .repartitionAndSortWithinPartitions(regionSplitPartitioner)
        .hbaseForeachPartition(
          this,
          (it, conn) => {

            val conf = broadcastedConf.value.value
            val fs = new Path(stagingDir).getFileSystem(conf)
            val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
            var previousRow: Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
            var rollOverRequested = false
            val localTableName = TableName.valueOf(tableRawName)

            // Here is where we finally iterate through the data in this partition of the
            // RDD that has been sorted and partitioned
            it.foreach {
              case (rowKey: ByteArrayWrapper, familiesQualifiersValues: FamiliesQualifiersValues) =>
                if (Bytes.compareTo(previousRow, rowKey.value) == 0) {
                  throw new KeyAlreadyExistsException(
                    "The following key was sent to the " +
                      "HFile load more then one: " + Bytes.toString(previousRow))
                }

                // The family map is a tree map so the families will be sorted
                val familyIt = familiesQualifiersValues.familyMap.entrySet().iterator()
                while (familyIt.hasNext) {
                  val familyEntry = familyIt.next()

                  val family = familyEntry.getKey.value

                  val qualifierIt = familyEntry.getValue.entrySet().iterator()

                  // The qualifier map is a tree map so the families will be sorted
                  while (qualifierIt.hasNext) {

                    val qualifierEntry = qualifierIt.next()
                    val qualifier = qualifierEntry.getKey
                    val cellValue = qualifierEntry.getValue

                    writeValueToHFile(
                      rowKey.value,
                      family,
                      qualifier.value, // qualifier
                      cellValue, // value
                      nowTimeStamp,
                      fs,
                      conn,
                      localTableName,
                      conf,
                      familyHFileWriteOptionsMapInternal,
                      defaultCompression,
                      writerMap,
                      stagingDir)

                    previousRow = rowKey.value
                  }

                  writerMap.values.foreach(
                    wl => {
                      rollOverRequested = rollOverRequested || wl.written > maxSize

                      // This will only roll if we have at least one column family file that is
                      // bigger then maxSize and we have finished a given row key
                      if (rollOverRequested) {
                        rollWriters(
                          fs,
                          writerMap,
                          regionSplitPartitioner,
                          previousRow,
                          compactionExclude)
                        rollOverRequested = false
                      }
                    })
                }
            }

            // This will get a writer for the column family
            // If there is no writer for a given column family then
            // it will get created here.
            // We have finished all the data so lets close up the writers
            rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude)
            rollOverRequested = false
          })
    } finally {
      if (null != conn) conn.close()
    }
  }