in common/src/main/scala/org/apache/celeborn/common/meta/DeviceInfo.scala [176:266]
def getDeviceAndDiskInfos(
workingDirs: Seq[(File, Long, Int, StorageInfo.Type)],
conf: CelebornConf): (util.Map[String, DeviceInfo], util.Map[String, DiskInfo]) = {
val deviceNameToDeviceInfo = new util.HashMap[String, DeviceInfo]()
val mountPointToDeviceInfo = new util.HashMap[String, DeviceInfo]()
val dfResult = runCommand("df -ah").trim
logger.info(s"df result\n$dfResult")
// (/dev/vdb, /mnt/disk1)
val fsMounts = dfResult
.split("[\n\r]")
.tail
.map(line => {
val tokens = line.trim.split("[ \t]+")
(tokens.head, tokens.last)
})
// (vda, vdb)
val lsBlockResult = runCommand("ls /sys/block/").trim
logger.info(s"ls block\n$lsBlockResult")
val blocks = lsBlockResult.split("[ \n\r\t]+")
fsMounts.foreach { case (fileSystem, mountPoint) =>
val deviceName = fileSystem.substring(fileSystem.lastIndexOf('/') + 1)
var index = -1
var maxLength = -1
blocks.zipWithIndex.foreach(block => {
if (deviceName.startsWith(block._1) && block._1.length > maxLength) {
index = block._2
maxLength = block._1.length
}
})
val newDeviceInfoFunc =
new util.function.Function[String, DeviceInfo]() {
override def apply(s: String): DeviceInfo = {
val deviceInfo = new DeviceInfo(s)
if (index < 0) {
// device not found in /sys/block/
deviceInfo.deviceStatAvailable = false
}
deviceInfo
}
}
val deviceInfo =
if (index >= 0) {
deviceNameToDeviceInfo.computeIfAbsent(blocks(index), newDeviceInfoFunc)
} else {
deviceNameToDeviceInfo.computeIfAbsent(deviceName, newDeviceInfoFunc)
}
mountPointToDeviceInfo.putIfAbsent(mountPoint, deviceInfo)
}
val retDeviceInfos = JavaUtils.newConcurrentHashMap[String, DeviceInfo]()
val retDiskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
workingDirs.groupBy { case (dir, _, _, _) =>
getMountPoint(dir.getCanonicalPath, mountPointToDeviceInfo.keySet())
}.foreach {
case (mountPoint, dirs) =>
if (mountPoint.nonEmpty) {
val deviceInfo = mountPointToDeviceInfo.get(mountPoint)
val diskInfo = new DiskInfo(
mountPoint,
dirs.map(_._1).toList,
deviceInfo,
conf)
val (_, maxUsableSpace, threadCount, storageType) = dirs(0)
diskInfo.configuredUsableSpace = maxUsableSpace
diskInfo.threadCount = threadCount
diskInfo.storageType = storageType
deviceInfo.addDiskInfo(diskInfo)
retDiskInfos.put(mountPoint, diskInfo)
} else {
logger.warn(
s"Can't find mount point for ${dirs.map(_._1.getCanonicalPath).mkString(",")}")
}
}
deviceNameToDeviceInfo.asScala.foreach {
case (_, deviceInfo) =>
if (deviceInfo.diskInfos.nonEmpty) {
retDeviceInfos.put(deviceInfo.name, deviceInfo)
}
}
logger.info(s"Device initialization \n " +
s"$retDeviceInfos \n $retDiskInfos")
(retDeviceInfos, retDiskInfos)
}