def bulkLoad[T]()

in loader/src/main/scala/org/apache/s2graph/loader/spark/HBaseContext.scala [605:811]


  def bulkLoad[T](rdd:RDD[T],
                  tableName: TableName,
                  startKeys: Array[Array[Byte]],
                  flatMap: (T) => Iterator[(KeyFamilyQualifier, Array[Byte])],
                  stagingDir:String,
                  familyHFileWriteOptionsMap: util.Map[Array[Byte], FamilyHFileWriteOptions] = new util.HashMap[Array[Byte], FamilyHFileWriteOptions],
                  compactionExclude: Boolean = false,
                  maxSize:Long = HConstants.DEFAULT_MAX_FILE_SIZE):
  Unit = {
//    val conn = ConnectionFactory.createConnection(config)
//    val regionLocator = conn.getRegionLocator(tableName)
//    val startKeys = regionLocator.getStartKeys
    val defaultCompressionStr = config.get("hfile.compression",
      Compression.Algorithm.NONE.getName)
    val defaultCompression = Compression.getCompressionAlgorithmByName(defaultCompressionStr)
//      HFileWriterImpl
//      .compressionByName(defaultCompressionStr)
    val now = System.currentTimeMillis()
    val tableNameByteArray = tableName.getName

    val familyHFileWriteOptionsMapInternal =
      new util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions]

    val entrySetIt = familyHFileWriteOptionsMap.entrySet().iterator()

    while (entrySetIt.hasNext) {
      val entry = entrySetIt.next()
      familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(entry.getKey), entry.getValue)
    }

    /*
     *  This will return a new HFile writer when requested
     *
     * @param family       column family
     * @param conf         configuration to connect to HBase
     * @param favoredNodes nodes that we would like to write too
     * @param fs           FileSystem object where we will be writing the HFiles to
     * @return WriterLength object
     */
    def getNewWriter(family: Array[Byte], conf: Configuration,
                     favoredNodes: Array[InetSocketAddress],
                     fs:FileSystem,
                     familydir:Path): WriterLength = {


      var familyOptions = familyHFileWriteOptionsMapInternal.get(new ByteArrayWrapper(family))

      if (familyOptions == null) {
        familyOptions = new FamilyHFileWriteOptions(defaultCompression.toString,
          BloomType.NONE.toString, HConstants.DEFAULT_BLOCKSIZE, DataBlockEncoding.NONE.toString)
        familyHFileWriteOptionsMapInternal.put(new ByteArrayWrapper(family), familyOptions)
      }

      val tempConf = new Configuration(conf)
      tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f)
      val contextBuilder = new HFileContextBuilder()
        .withCompression(Algorithm.valueOf(familyOptions.compression))
        .withChecksumType(HStore.getChecksumType(conf))
        .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
        .withBlockSize(familyOptions.blockSize)
      contextBuilder.withDataBlockEncoding(DataBlockEncoding.
        valueOf(familyOptions.dataBlockEncoding))
      val hFileContext = contextBuilder.build()

      if (null == favoredNodes) {
        new WriterLength(0, new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs)
          .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
          .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext).build())
//          .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext).build())
      } else {
        new WriterLength(0,
          new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), new HFileSystem(fs))
          .withOutputDir(familydir).withBloomType(BloomType.valueOf(familyOptions.bloomType))
            .withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext)
//          .withComparator(CellComparator.COMPARATOR).withFileContext(hFileContext)
          .withFavoredNodes(favoredNodes).build())
      }
    }

    val regionSplitPartitioner =
      new BulkLoadPartitioner(startKeys)

    //This is where all the magic happens
    //Here we are going to do the following things
    // 1. FlapMap every row in the RDD into key column value tuples
    // 2. Then we are going to repartition sort and shuffle
    // 3. Finally we are going to write out our HFiles
    rdd.flatMap( r => flatMap(r)).
      repartitionAndSortWithinPartitions(regionSplitPartitioner).
      hbaseForeachPartition(this, (it, conn) => {

      val conf = broadcastedConf.value.value
      val fs = FileSystem.get(conf)
      val writerMap = new mutable.HashMap[ByteArrayWrapper, WriterLength]
      var previousRow:Array[Byte] = HConstants.EMPTY_BYTE_ARRAY
      var rollOverRequested = false

      /*
       * This will roll all writers
       */
      def rollWriters(): Unit = {
        writerMap.values.foreach( wl => {
          if (wl.writer != null) {
            logDebug("Writer=" + wl.writer.getPath +
              (if (wl.written == 0) "" else ", wrote=" + wl.written))
            close(wl.writer)
          }
        })
        writerMap.clear()
        rollOverRequested = false
      }

      /*
       * This function will close a given HFile writer
       * @param w The writer to close
       */
      def close(w:StoreFile.Writer): Unit = {
        if (w != null) {
          w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
            Bytes.toBytes(System.currentTimeMillis()))
          w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
            Bytes.toBytes(regionSplitPartitioner.getPartition(previousRow)))
          w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
            Bytes.toBytes(true))
          w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
            Bytes.toBytes(compactionExclude))
          w.appendTrackedTimestampsToMetadata()
          w.close()
        }
      }

      //Here is where we finally iterate through the data in this partition of the
      //RDD that has been sorted and partitioned
      it.foreach{ case (keyFamilyQualifier, cellValue:Array[Byte]) =>

        //This will get a writer for the column family
        //If there is no writer for a given column family then
        //it will get created here.
        val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(keyFamilyQualifier.family), {

          val familyDir = new Path(stagingDir, Bytes.toString(keyFamilyQualifier.family))

          fs.mkdirs(familyDir)

          val loc:HRegionLocation = {
            try {
              val locator =
                conn.getRegionLocator(TableName.valueOf(tableNameByteArray))
              locator.getRegionLocation(keyFamilyQualifier.rowKey)
            } catch {
              case e: Throwable =>
              logWarning("there's something wrong when locating rowkey: " +
                Bytes.toString(keyFamilyQualifier.rowKey))
                null
            }
          }
          if (null == loc) {
            if (log.isTraceEnabled) {
              logTrace("failed to get region location, so use default writer: " +
                Bytes.toString(keyFamilyQualifier.rowKey))
            }
            getNewWriter(family = keyFamilyQualifier.family, conf = conf, favoredNodes = null,
              fs = fs, familydir = familyDir)
          } else {
            if (log.isDebugEnabled) {
              logDebug("first rowkey: [" + Bytes.toString(keyFamilyQualifier.rowKey) + "]")
            }
            val initialIsa =
              new InetSocketAddress(loc.getHostname, loc.getPort)
            if (initialIsa.isUnresolved) {
              if (log.isTraceEnabled) {
                logTrace("failed to resolve bind address: " + loc.getHostname + ":"
                  + loc.getPort + ", so use default writer")
              }
              getNewWriter(keyFamilyQualifier.family, conf, null, fs, familyDir)
            } else {
              if(log.isDebugEnabled) {
                logDebug("use favored nodes writer: " + initialIsa.getHostString)
              }
              getNewWriter(keyFamilyQualifier.family, conf,
                Array[InetSocketAddress](initialIsa), fs, familyDir)
            }
          }
        })

        val keyValue =new KeyValue(keyFamilyQualifier.rowKey,
          keyFamilyQualifier.family,
          keyFamilyQualifier.qualifier,
          now,cellValue)

        wl.writer.append(keyValue)
        wl.written += keyValue.getLength

        rollOverRequested = rollOverRequested || wl.written > maxSize

        //This will only roll if we have at least one column family file that is
        //bigger then maxSize and we have finished a given row key
        if (rollOverRequested && Bytes.compareTo(previousRow, keyFamilyQualifier.rowKey) != 0) {
          rollWriters()
        }

        previousRow = keyFamilyQualifier.rowKey
      }
      //We have finished all the data so lets close up the writers
      rollWriters()
    })
  }