def bulkLoad[T]()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [605:703]


  def bulkLoad[T](
      rdd: RDD[T],
      tableName: TableName,
      flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
      stagingDir: String,
      familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] =
        new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
      compactionExclude: Boolean = false,
      maxSize: Long = HConstants.DEFAULT_MAX_FILE_SIZE,
      nowTimeStamp: Long = System.currentTimeMillis()): Unit = {
    val stagingPath = new Path(stagingDir)
    val fs = stagingPath.getFileSystem(config)
    if (fs.exists(stagingPath)) {
      throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
    }
    val conn = HBaseConnectionCache.getConnection(config)
    try {
      val regionLocator = conn.getRegionLocator(tableName)
      val startKeys = regionLocator.getStartKeys
      if (startKeys.length == 0) {
        logInfo("Table " + tableName.toString + " was not found")
      }
      val defaultCompressionStr =
        config.get("hfile.compression", Compression.Algorithm.NONE.getName)
      val hfileCompression = HFileWriterImpl
        .compressionByName(defaultCompressionStr)
      val tableRawName = tableName.getName

      val familyHFileWriteOptionsMapInternal =
        new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]

      val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()

      while (entrySetIt.hasNext) {
        val entry = entrySetIt.next()
        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
      }

      val regionSplitPartitioner =
        new BulkLoadPartitioner(startKeys)

      // This is where all the magic happens
      // Here we are going to do the following things
      // 1. FlapMap every row in the RDD into key column value tuples
      // 2. Then we are going to repartition sort and shuffle
      // 3. Finally we are going to write out our HFiles
      rdd
        .flatMap(r => flatMap(r))
        .repartitionAndSortWithinPartitions(regionSplitPartitioner)
        .hbaseForeachPartition(
          this,
          (it, conn) => {

            val conf = broadcastedConf.value.value
            val fs = new Path(stagingDir).getFileSystem(conf)
            val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
            var previousRow: Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
            var rollOverRequested = false
            val localTableName = TableName.valueOf(tableRawName)

            // Here is where we finally iterate through the data in this partition of the
            // RDD that has been sorted and partitioned
            it.foreach {
              case (keyFamilyQualifier, cellValue: Array[Byte]) =>
                val wl = writeValueToHFile(
                  keyFamilyQualifier.rowKey,
                  keyFamilyQualifier.family,
                  keyFamilyQualifier.qualifier,
                  cellValue,
                  nowTimeStamp,
                  fs,
                  conn,
                  localTableName,
                  conf,
                  familyHFileWriteOptionsMapInternal,
                  hfileCompression,
                  writerMap,
                  stagingDir)

                rollOverRequested = rollOverRequested || wl.written > maxSize

                // This will only roll if we have at least one column family file that is
                // bigger then maxSize and we have finished a given row key
                if (rollOverRequested && Bytes
                    .compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
                  rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude)
                  rollOverRequested = false
                }

                previousRow = keyFamilyQualifier.rowKey
            }
            // We have finished all the data so lets close up the writers
            rollWriters(fs, writerMap, regionSplitPartitioner, previousRow, compactionExclude)
            rollOverRequested = false
          })
    } finally {
      if (null != conn) conn.close()
    }
  }