void CheckWorker::loop()

in src/storage/worker/CheckWorker.cc [98:279]


void CheckWorker::loop(const std::vector<Path> &targetPaths, const std::vector<std::string> &manufacturers) {
  (void)manufacturers;

  // 0. initialize records.
  static auto recorders = [&] {
    std::vector<std::unique_ptr<Recorders>> recorders;
    for (auto i = 0ul; i < targetPaths.size(); ++i) {
      monitor::TagSet tag;
      tag.addTag("instance", std::to_string(i));
      recorders.push_back(std::make_unique<Recorders>(tag));
    }
    return recorders;
  }();

  RelativeTime lastCheckDiskStatusTime{};
  RelativeTime lastCleanUpExpiredClientsTime{};
  RelativeTime lastTriggerHeartbeatTime{};
  RelativeTime lastUpdateTargetUsedSizeTime = RelativeTime::now();
  RelativeTime lastChunkEngineMetricsReportTime = RelativeTime::now();
  robin_hood::unordered_map<uint32_t, double> diskUsage;
  while (!stopping_) {
    auto lock = std::unique_lock(mutex_);
    if (cond_.wait_for(lock, 100_ms, [&] { return stopping_.load(); })) {
      break;
    }

    // 1. reload offline targets.
    {
      auto snapshot = components_.targetMap.snapshot();
      for (auto &[targetId, target] : snapshot->getTargets()) {
        if (target.unrecoverableOffline()) {
          continue;
        }
        if (target.localState == flat::LocalTargetState::OFFLINE) {
          if (target.weakStorageTarget.expired()) {
            auto result = components_.storageTargets.loadTarget(target.path);
            if (UNLIKELY(!result)) {
              XLOGF(ERR, "CheckWorker@{} reload target {} failed", fmt::ptr(this), target.path);
            } else {
              XLOGF(INFO, "CheckWorker@{} reload target {} succ", fmt::ptr(this), target.path);
              components_.refreshRoutingInfo();
            }
          } else {
            XLOGF(WARNING, "CheckWorker@{} offline target {} is still being used", fmt::ptr(this), target.path);
          }
        }
      }
    }

    // 2. check disk status.
    auto now = RelativeTime::now();
    auto diskLowSpaceThreshold = config_.disk_low_space_threshold();
    auto diskRejectCreateChunkThreshold = config_.disk_reject_create_chunk_threshold();
    if (now - lastCheckDiskStatusTime >= 3_s) {
      lastCheckDiskStatusTime = now;
      XLOGF(DBG9, "check disk status start");
      for (auto i = 0ul; i < targetPaths.size(); ++i) {
        auto &targetPath = targetPaths[i];
        auto &recorder = *recorders[i];
        boost::system::error_code ec{};
        auto spaceInfo = boost::filesystem::space(targetPath, ec);
        if (UNLIKELY(ec.failed())) {
          XLOGF(CRITICAL, "check disk failed {}, errno: {}", targetPath, ec.message());
          components_.targetMap.offlineTargets(targetPath);
          continue;
        }

        recorder.disk_capacity.set(spaceInfo.capacity);
        recorder.disk_free.set(spaceInfo.available);
        diskUsage[i] = 1.0 - (double)spaceInfo.available / std::max(1ul, spaceInfo.capacity);

        auto recordGuard = recorder.check_disk.record();
        bool writable = checkWritable(targetPath);
        if (!writable) {
          recorder.disk_readonly.set(1);
          XLOGF(CRITICAL, "check disk failed {}, readonly", targetPath);
          components_.targetMap.offlineTargets(targetPath);
          continue;
        }
        recordGuard.report(true);

        bool lowSpace = diskUsage[i] >= diskLowSpaceThreshold;
        bool rejectCreateChunk = diskUsage[i] >= diskRejectCreateChunkThreshold;
        components_.storageTargets.engines()[i]->set_allow_to_allocate(!rejectCreateChunk);
        components_.targetMap.updateDiskState(targetPath, lowSpace, rejectCreateChunk);
      }
      XLOGF(DBG9, "check disk status finished");
    }

    // 3. clean up expired clients.
    now = RelativeTime::now();
    if (now - lastCleanUpExpiredClientsTime >= 60_s) {
      lastCleanUpExpiredClientsTime = now;
      auto result = components_.getActiveClientsList();
      if (result) {
        components_.reliableUpdate.cleanUpExpiredClients(*result);
      } else if (result.error().code() != StorageClientCode::kRoutingError) {
        XLOGF(ERR, "get active clients list error: {}", result.error());
      }
    }

    // 4. update target used size.
    now = RelativeTime::now();
    auto emergencyRecyclingRatio = config_.emergency_recycling_ratio();
    if (now - lastUpdateTargetUsedSizeTime >= config_.update_target_size_interval()) {
      lastUpdateTargetUsedSizeTime = now;
      components_.targetMap.updateTargetUsedSize();

      robin_hood::unordered_map<uint32_t, uint64_t> diskUnusedSize;
      robin_hood::unordered_map<uint32_t, std::pair<uint32_t, uint32_t>> chunkEngineCount;
      auto snapshot = components_.targetMap.snapshot();
      for (auto &[targetId, target] : snapshot->getTargets()) {
        if (!target.unrecoverableOffline() && target.localState != flat::LocalTargetState::OFFLINE &&
            target.storageTarget != nullptr) {
          target.storageTarget->reportUnrecycledSize();
          target.storageTarget->setEmergencyRecycling(diskUsage[target.diskIndex] >= emergencyRecyclingRatio);
          diskUnusedSize[target.diskIndex] += target.storageTarget->unusedSize();
          if (target.storageTarget->useChunkEngine()) {
            chunkEngineCount[target.diskIndex].first++;
          } else {
            chunkEngineCount[target.diskIndex].second++;
          }
        }
      }
      for (auto i = 0ul; i < targetPaths.size(); ++i) {
        auto tag = monitor::instanceTagSet(std::to_string(i));
        auto [new_count, old_count] = chunkEngineCount[i];
        new_chunk_engine_count.set(new_count, tag);
        old_chunk_engine_count.set(old_count, tag);
        auto rawUsedSize = components_.storageTargets.engines()[i]->raw_used_size();
        auto &recorder = *recorders[i];
        recorder.disk_available.set(diskUnusedSize[i] + rawUsedSize.reserved_size + recorder.disk_free.value());
        recorder.position_count.set(rawUsedSize.position_count);
        recorder.position_rc.set(rawUsedSize.position_rc);
      }
    }

    // 5. trigger heartbeat if need.
    if (now - lastTriggerHeartbeatTime >= 1_s) {
      lastTriggerHeartbeatTime = now;
      components_.triggerHeartbeatIfNeed();
    }

    // 6. report chunk engine metrics.
    now = RelativeTime::now();
    if (now - lastChunkEngineMetricsReportTime >= 1_s) {
      lastChunkEngineMetricsReportTime = now;
      for (auto i = 0ul; i < targetPaths.size(); ++i) {
        auto &recorder = *recorders[i];
        auto metrics = components_.storageTargets.engines()[i]->get_metrics();
        recorder.copy_on_write_times.addSample(metrics.copy_on_write_times);
        if (metrics.copy_on_write_latency) {
          recorder.copy_on_write_latency.addSample(std::chrono::microseconds(metrics.copy_on_write_latency));
        }
        recorder.copy_on_write_read_times.addSample(metrics.copy_on_write_read_times);
        recorder.copy_on_write_read_bytes.addSample(metrics.copy_on_write_read_bytes);
        if (metrics.copy_on_write_read_latency) {
          recorder.copy_on_write_read_latency.addSample(std::chrono::microseconds(metrics.copy_on_write_read_latency));
        }
        recorder.checksum_reuse.addSample(metrics.checksum_reuse);
        recorder.checksum_combine.addSample(metrics.checksum_combine);
        recorder.checksum_recalculate.addSample(metrics.checksum_recalculate);
        recorder.safe_write_direct_append.addSample(metrics.safe_write_direct_append);
        recorder.safe_write_indirect_append.addSample(metrics.safe_write_indirect_append);
        recorder.safe_write_truncate_shorten.addSample(metrics.safe_write_truncate_shorten);
        recorder.safe_write_truncate_extend.addSample(metrics.safe_write_truncate_extend);
        recorder.safe_write_read_tail_times.addSample(metrics.safe_write_read_tail_times);
        recorder.safe_write_read_tail_bytes.addSample(metrics.safe_write_read_tail_bytes);
        recorder.allocate_times.addSample(metrics.allocate_times);
        if (metrics.allocate_latency) {
          recorder.allocate_latency.addSample(std::chrono::microseconds(metrics.allocate_latency));
        }
        recorder.pwrite_times.addSample(metrics.pwrite_times);
        if (metrics.pwrite_latency) {
          recorder.pwrite_latency.addSample(std::chrono::microseconds(metrics.pwrite_latency));
        }
      }
    }
  }
  stopped_ = true;
  XLOGF(INFO, "CheckWorker@{}::loop stopped", fmt::ptr(this));
}