override def startCheck()

in worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/DeviceMonitor.scala [112:186]


  override def startCheck(): Unit = {
    diskChecker.scheduleWithFixedDelay(
      new Runnable {
        override def run(): Unit = {
          logDebug("Device check start")
          try {
            observedDevices.values().asScala.foreach(device => {
              val mountPoints = device.diskInfos.keySet.asScala.toList
              // tolerate time accuracy for better performance
              val now = System.currentTimeMillis()
              for (concurrentList <- device.nonCriticalErrors.values().asScala) {
                for (time <- concurrentList.asScala) {
                  if (now - time > device.notifyErrorExpireTimeout) {
                    concurrentList.remove(time)
                  }
                }
              }
              val nonCriticalErrorSum = device.nonCriticalErrors.values().asScala.map(_.size).sum
              if (nonCriticalErrorSum > device.notifyErrorThreshold) {
                logError(s"Device ${device.deviceInfo.name} has accumulated $nonCriticalErrorSum non-critical " +
                  s"error within the past ${Utils.msDurationToString(device.notifyErrorExpireTimeout)} , its sum has " +
                  s"exceed the threshold (${device.notifyErrorThreshold}), device monitor will notify error to " +
                  s"observed device.")
                val mountPoints = device.diskInfos.values().asScala.map(_.mountPoint).toList
                device.notifyObserversOnError(mountPoints, DiskStatus.CRITICAL_ERROR)
              } else {
                if (checkIoHang && device.ioHang()) {
                  logError(s"Encounter device io hang error!" +
                    s"${device.deviceInfo.name}, notify observers")
                  device.notifyObserversOnNonCriticalError(mountPoints, DiskStatus.IO_HANG)
                } else {
                  device.diskInfos.values().asScala.foreach { diskInfo =>
                    try {
                      if (checkDiskUsage && DeviceMonitor.highDiskUsage(conf, diskInfo)) {
                        logError(s"${diskInfo.mountPoint} high_disk_usage error, notify observers")
                        device.notifyObserversOnHighDiskUsage(diskInfo.mountPoint)
                      } else if (checkReadWrite &&
                        DeviceMonitor.readWriteError(conf, diskInfo.dirs.head)) {
                        logError(s"${diskInfo.mountPoint} read-write error, notify observers")
                        // We think that if one dir in device has read-write problem, if possible all
                        // dirs in this device have the problem
                        device.notifyObserversOnNonCriticalError(
                          List(diskInfo.mountPoint),
                          DiskStatus.READ_OR_WRITE_FAILURE)
                      } else if (nonCriticalErrorSum <= device.notifyErrorThreshold * 0.5) {
                        device.notifyObserversOnHealthy(diskInfo.mountPoint)
                      }
                    } catch {
                      case e: ExecutionException =>
                        e.getCause match {
                          case fse: FileSystemException =>
                            logError(
                              s"${diskInfo.mountPoint} critical error, notify observers",
                              fse)
                            device.notifyObserversOnError(
                              List(diskInfo.mountPoint),
                              DiskStatus.CRITICAL_ERROR)
                          case throwable: Throwable =>
                            throw throwable
                        }
                    }
                  }
                }
              }
            })
          } catch {
            case t: Throwable =>
              logError("Device check failed.", t)
          }
        }
      },
      0,
      diskCheckInterval,
      TimeUnit.MILLISECONDS)
  }