def getDeviceAndDiskInfos()

in common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala [176:266]


  def getDeviceAndDiskInfos(
      workingDirs: Seq[(File, Long, Int, StorageInfo.Type)],
      conf: CelebornConf): (util.Map[String, DeviceInfo], util.Map[String, DiskInfo]) = {
    val deviceNameToDeviceInfo = new util.HashMap[String, DeviceInfo]()
    val mountPointToDeviceInfo = new util.HashMap[String, DeviceInfo]()

    val dfResult = runCommand("df -ah").trim
    logger.info(s"df result\n$dfResult")
    // (/dev/vdb, /mnt/disk1)
    val fsMounts = dfResult
      .split("[\n\r]")
      .tail
      .map(line => {
        val tokens = line.trim.split("[ \t]+")
        (tokens.head, tokens.last)
      })

    // (vda, vdb)
    val lsBlockResult = runCommand("ls /sys/block/").trim
    logger.info(s"ls block\n$lsBlockResult")
    val blocks = lsBlockResult.split("[ \n\r\t]+")

    fsMounts.foreach { case (fileSystem, mountPoint) =>
      val deviceName = fileSystem.substring(fileSystem.lastIndexOf('/') + 1)
      var index = -1
      var maxLength = -1
      blocks.zipWithIndex.foreach(block => {
        if (deviceName.startsWith(block._1) && block._1.length > maxLength) {
          index = block._2
          maxLength = block._1.length
        }
      })

      val newDeviceInfoFunc =
        new util.function.Function[String, DeviceInfo]() {
          override def apply(s: String): DeviceInfo = {
            val deviceInfo = new DeviceInfo(s)
            if (index < 0) {
              // device not found in /sys/block/
              deviceInfo.deviceStatAvailable = false
            }
            deviceInfo
          }
        }

      val deviceInfo =
        if (index >= 0) {
          deviceNameToDeviceInfo.computeIfAbsent(blocks(index), newDeviceInfoFunc)
        } else {
          deviceNameToDeviceInfo.computeIfAbsent(deviceName, newDeviceInfoFunc)
        }
      mountPointToDeviceInfo.putIfAbsent(mountPoint, deviceInfo)
    }

    val retDeviceInfos = JavaUtils.newConcurrentHashMap[String, DeviceInfo]()
    val retDiskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()

    workingDirs.groupBy { case (dir, _, _, _) =>
      getMountPoint(dir.getCanonicalPath, mountPointToDeviceInfo.keySet())
    }.foreach {
      case (mountPoint, dirs) =>
        if (mountPoint.nonEmpty) {
          val deviceInfo = mountPointToDeviceInfo.get(mountPoint)
          val diskInfo = new DiskInfo(
            mountPoint,
            dirs.map(_._1).toList,
            deviceInfo,
            conf)
          val (_, maxUsableSpace, threadCount, storageType) = dirs(0)
          diskInfo.configuredUsableSpace = maxUsableSpace
          diskInfo.threadCount = threadCount
          diskInfo.storageType = storageType
          deviceInfo.addDiskInfo(diskInfo)
          retDiskInfos.put(mountPoint, diskInfo)
        } else {
          logger.warn(
            s"Can't find mount point for ${dirs.map(_._1.getCanonicalPath).mkString(",")}")
        }
    }
    deviceNameToDeviceInfo.asScala.foreach {
      case (_, deviceInfo) =>
        if (deviceInfo.diskInfos.nonEmpty) {
          retDeviceInfos.put(deviceInfo.name, deviceInfo)
        }
    }

    logger.info(s"Device initialization \n " +
      s"$retDeviceInfos \n $retDiskInfos")

    (retDeviceInfos, retDiskInfos)
  }