private def writeValueToHFile()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [959:1049]


  private def writeValueToHFile(
      rowKey: Array[Byte],
      family: Array[Byte],
      qualifier: Array[Byte],
      cellValue: Array[Byte],
      nowTimeStamp: Long,
      fs: FileSystem,
      conn: Connection,
      tableName: TableName,
      conf: Configuration,
      familyHFileWriteOptionsMapInternal: util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
      hfileCompression: Compression.Algorithm,
      writerMap: mutable.HashMap[ByteArrayWrapper, WriterLength],
      stagingDir: String): WriterLength = {

    val wl = writerMap.getOrElseUpdate(
      new ByteArrayWrapper(family), {
        val familyDir = new Path(stagingDir, Bytes.toString(family))

        familyDir.getFileSystem(conf).mkdirs(familyDir);

        val loc: HRegionLocation = {
          try {
            val locator =
              conn.getRegionLocator(tableName)
            locator.getRegionLocation(rowKey)
          } catch {
            case e: Throwable =>
              logWarning(
                "there's something wrong when locating rowkey: " +
                  Bytes.toString(rowKey))
              null
          }
        }
        if (null == loc) {
          if (log.isTraceEnabled) {
            logTrace(
              "failed to get region location, so use default writer: " +
                Bytes.toString(rowKey))
          }
          getNewHFileWriter(
            family = family,
            conf = conf,
            favoredNodes = null,
            fs = fs,
            familydir = familyDir,
            familyHFileWriteOptionsMapInternal,
            hfileCompression)
        } else {
          if (log.isDebugEnabled) {
            logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
          }
          val initialIsa =
            new InetSocketAddress(loc.getHostname, loc.getPort)
          if (initialIsa.isUnresolved) {
            if (log.isTraceEnabled) {
              logTrace(
                "failed to resolve bind address: " + loc.getHostname + ":"
                  + loc.getPort + ", so use default writer")
            }
            getNewHFileWriter(
              family,
              conf,
              null,
              fs,
              familyDir,
              familyHFileWriteOptionsMapInternal,
              hfileCompression)
          } else {
            if (log.isDebugEnabled) {
              logDebug("use favored nodes writer: " + initialIsa.getHostString)
            }
            getNewHFileWriter(
              family,
              conf,
              Array[InetSocketAddress](initialIsa),
              fs,
              familyDir,
              familyHFileWriteOptionsMapInternal,
              hfileCompression)
          }
        }
      })

    val keyValue = new KeyValue(rowKey, family, qualifier, nowTimeStamp, cellValue)

    wl.writer.append(keyValue)
    wl.written += keyValue.getLength

    wl
  }