def bulkLoad[T]()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [612:712]


  def bulkLoad[T](rdd:RDD[T],
                  tableName: TableName,
                  flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
                  stagingDir:String,
                  familyHFileWriteOptionsMap:
                  util.Map[Array[Byte], FamilyHFileWriteOptions] =
                  new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
                  compactionExclude: Boolean = false,
                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE,
                  nowTimeStamp:Long = System.currentTimeMillis()):
  Unit = {
    val stagingPath = new Path(stagingDir)
    val fs = stagingPath.getFileSystem(config)
    if (fs.exists(stagingPath)) {
      throw new FileAlreadyExistsException("Path " + stagingDir + " already exists")
    }
    val conn = HBaseConnectionCache.getConnection(config)
    try {
      val regionLocator = conn.getRegionLocator(tableName)
      val startKeys = regionLocator.getStartKeys
      if (startKeys.length == 0) {
        logInfo("Table " + tableName.toString + " was not found")
      }
      val defaultCompressionStr = config.get("hfile.compression",
        Compression.Algorithm.NONE.getName)
      val hfileCompression = HFileWriterImpl
        .compressionByName(defaultCompressionStr)
      val tableRawName = tableName.getName

      val familyHFileWriteOptionsMapInternal =
        new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]

      val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()

      while (entrySetIt.hasNext) {
        val entry = entrySetIt.next()
        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
      }

      val regionSplitPartitioner =
        new BulkLoadPartitioner(startKeys)

      //This is where all the magic happens
      //Here we are going to do the following things
      // 1. FlapMap every row in the RDD into key column value tuples
      // 2. Then we are going to repartition sort and shuffle
      // 3. Finally we are going to write out our HFiles
      rdd.flatMap( r => flatMap(r)).
        repartitionAndSortWithinPartitions(regionSplitPartitioner).
        hbaseForeachPartition(this, (it, conn) => {

          val conf = broadcastedConf.value.value
          val fs = new Path(stagingDir).getFileSystem(conf)
          val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
          var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
          var rollOverRequested = false
          val localTableName = TableName.valueOf(tableRawName)

          //Here is where we finally iterate through the data in this partition of the
          //RDD that has been sorted and partitioned
          it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>

            val wl = writeValueToHFile(keyFamilyQualifier.rowKey,
              keyFamilyQualifier.family,
              keyFamilyQualifier.qualifier,
              cellValue,
              nowTimeStamp,
              fs,
              conn,
              localTableName,
              conf,
              familyHFileWriteOptionsMapInternal,
              hfileCompression,
              writerMap,
              stagingDir)

            rollOverRequested = rollOverRequested || wl.written > maxSize

            //This will only roll if we have at least one column family file that is
            //bigger then maxSize and we have finished a given row key
            if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
              rollWriters(fs, writerMap,
                regionSplitPartitioner,
                previousRow,
                compactionExclude)
              rollOverRequested = false
            }

            previousRow = keyFamilyQualifier.rowKey
          }
          //We have finished all the data so lets close up the writers
          rollWriters(fs, writerMap,
            regionSplitPartitioner,
            previousRow,
            compactionExclude)
          rollOverRequested = false
        })
    } finally {
      if(null != conn) conn.close()
    }
  }