private def writeValueToHFile()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseContext.scala [962:1049]


  private def writeValueToHFile(rowKey: Array[Byte],
                        family: Array[Byte],
                        qualifier: Array[Byte],
                        cellValue:Array[Byte],
                        nowTimeStamp: Long,
                        fs: FileSystem,
                        conn: Connection,
                        tableName: TableName,
                        conf: Configuration,
                        familyHFileWriteOptionsMapInternal:
                        util.HashMap[ByteArrayWrapper, FamilyHFileWriteOptions],
                        hfileCompression:Compression.Algorithm,
                        writerMap:mutable.HashMap[ByteArrayWrapper, WriterLength],
                        stagingDir: String
                         ): WriterLength = {

    val wl = writerMap.getOrElseUpdate(new ByteArrayWrapper(family), {
      val familyDir = new Path(stagingDir, Bytes.toString(family))

      familyDir.getFileSystem(conf).mkdirs(familyDir);

      val loc:HRegionLocation = {
        try {
          val locator =
            conn.getRegionLocator(tableName)
          locator.getRegionLocation(rowKey)
        } catch {
          case e: Throwable =>
            logWarning("there's something wrong when locating rowkey: " +
              Bytes.toString(rowKey))
            null
        }
      }
      if (null == loc) {
        if (log.isTraceEnabled) {
          logTrace("failed to get region location, so use default writer: " +
            Bytes.toString(rowKey))
        }
        getNewHFileWriter(family = family,
          conf = conf,
          favoredNodes = null,
          fs = fs,
          familydir = familyDir,
          familyHFileWriteOptionsMapInternal,
          hfileCompression)
      } else {
        if (log.isDebugEnabled) {
          logDebug("first rowkey: [" + Bytes.toString(rowKey) + "]")
        }
        val initialIsa =
          new InetSocketAddress(loc.getHostname, loc.getPort)
        if (initialIsa.isUnresolved) {
          if (log.isTraceEnabled) {
            logTrace("failed to resolve bind address: " + loc.getHostname + ":"
              + loc.getPort + ", so use default writer")
          }
          getNewHFileWriter(family,
            conf,
            null,
            fs,
            familyDir,
            familyHFileWriteOptionsMapInternal,
            hfileCompression)
        } else {
          if(log.isDebugEnabled) {
            logDebug("use favored nodes writer: " + initialIsa.getHostString)
          }
          getNewHFileWriter(family,
            conf,
            Array[InetSocketAddress](initialIsa),
            fs,
            familyDir,
            familyHFileWriteOptionsMapInternal,
            hfileCompression)
        }
      }
    })

    val keyValue =new KeyValue(rowKey,
      family,
      qualifier,
      nowTimeStamp,cellValue)

    wl.writer.append(keyValue)
    wl.written += keyValue.getLength

    wl
  }