in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala [112:186]
override def startCheck(): Unit = {
diskChecker.scheduleWithFixedDelay(
new Runnable {
override def run(): Unit = {
logDebug("Device check start")
try {
observedDevices.values().asScala.foreach(device => {
val mountPoints = device.diskInfos.keySet.asScala.toList
// tolerate time accuracy for better performance
val now = System.currentTimeMillis()
for (concurrentList <- device.nonCriticalErrors.values().asScala) {
for (time <- concurrentList.asScala) {
if (now - time > device.notifyErrorExpireTimeout) {
concurrentList.remove(time)
}
}
}
val nonCriticalErrorSum = device.nonCriticalErrors.values().asScala.map(_.size).sum
if (nonCriticalErrorSum > device.notifyErrorThreshold) {
logError(s"Device ${device.deviceInfo.name} has accumulated $nonCriticalErrorSum non-critical " +
s"error within the past ${Utils.msDurationToString(device.notifyErrorExpireTimeout)} , its sum has " +
s"exceed the threshold (${device.notifyErrorThreshold}), device monitor will notify error to " +
s"observed device.")
val mountPoints = device.diskInfos.values().asScala.map(_.mountPoint).toList
device.notifyObserversOnError(mountPoints, DiskStatus.CRITICAL_ERROR)
} else {
if (checkIoHang && device.ioHang()) {
logError(s"Encounter device io hang error!" +
s"${device.deviceInfo.name}, notify observers")
device.notifyObserversOnNonCriticalError(mountPoints, DiskStatus.IO_HANG)
} else {
device.diskInfos.values().asScala.foreach { diskInfo =>
try {
if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, diskInfo)) {
logError(s"${diskInfo.mountPoint} high_disk_usage error, notify observers")
device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
} else if (checkReadWrite &&
DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
logError(s"${diskInfo.mountPoint} read-write error, notify observers")
// We think that if one dir in device has read-write problem, if possible all
// dirs in this device have the problem
device.notifyObserversOnNonCriticalError(
List(diskInfo.mountPoint),
DiskStatus.READ_OR_WRITE_FAILURE)
} else if (nonCriticalErrorSum <= device.notifyErrorThreshold * 0.5) {
device.notifyObserversOnHealthy(diskInfo.mountPoint)
}
} catch {
case e: ExecutionException =>
e.getCause match {
case fse: FileSystemException =>
logError(
s"${diskInfo.mountPoint} critical error, notify observers",
fse)
device.notifyObserversOnError(
List(diskInfo.mountPoint),
DiskStatus.CRITICAL_ERROR)
case throwable: Throwable =>
throw throwable
}
}
}
}
}
})
} catch {
case t: Throwable =>
logError("Device check failed.", t)
}
}
},
0,
diskCheckInterval,
TimeUnit.MILLISECONDS)
}